| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| import importlib |
| import logging |
| import os |
| import secrets |
| import shutil |
| import time |
| import typing |
| import unittest |
| from os.path import dirname |
| |
| import numpy |
| import pytest |
| import yaml |
| |
| import apache_beam as beam |
| from apache_beam.options.pipeline_options import PipelineOptions |
| from apache_beam.testing.test_pipeline import TestPipeline |
| from apache_beam.testing.util import assert_that |
| from apache_beam.testing.util import equal_to |
| from apache_beam.transforms.external import BeamJarExpansionService |
| from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN |
| from apache_beam.transforms.external_transform_provider import ExternalTransform |
| from apache_beam.transforms.external_transform_provider import ExternalTransformProvider |
| from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case |
| from apache_beam.transforms.external_transform_provider import infer_name_from_identifier |
| from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case |
| from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case |
| from apache_beam.transforms.xlang.io import GenerateSequence |
| |
| |
| class NameAndTypeUtilsTest(unittest.TestCase): |
| def test_snake_case_to_upper_camel_case(self): |
| test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"), |
| ("test_double_underscore", "TestDoubleUnderscore"), |
| ("TEST_CAPITALIZED", "TestCapitalized"), |
| ("_prepended_underscore", "PrependedUnderscore"), |
| ("appended_underscore_", "AppendedUnderscore")] |
| for case in test_cases: |
| self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0])) |
| |
| def test_snake_case_to_lower_camel_case(self): |
| test_cases = [("", ""), ("test", "test"), ("test_name", "testName"), |
| ("test_double_underscore", "testDoubleUnderscore"), |
| ("TEST_CAPITALIZED", "testCapitalized"), |
| ("_prepended_underscore", "prependedUnderscore"), |
| ("appended_underscore_", "appendedUnderscore")] |
| for case in test_cases: |
| self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0])) |
| |
| def test_camel_case_to_snake_case(self): |
| test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"), |
| ("TestDoubleUnderscore", |
| "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"), |
| ("BEGINNINGAllCaps", |
| "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"), |
| ("AllCapsMIDDLEWord", "all_caps_middle_word"), |
| ("lowerCamelCase", "lower_camel_case")] |
| for case in test_cases: |
| self.assertEqual(case[1], camel_case_to_snake_case(case[0])) |
| |
| def test_infer_name_from_identifier(self): |
| standard_test_cases = [ |
| ("beam:schematransform:org.apache.beam:transform:v1", "Transform"), |
| ("beam:schematransform:org.apache.beam:my_transform:v1", |
| "MyTransform"), ( |
| "beam:schematransform:org.apache.beam:my_transform:v2", |
| "MyTransformV2"), |
| ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2", "FeFiFoFumV2"), |
| ("beam:schematransform:bad_match:my_transform:v1", None) |
| ] |
| for case in standard_test_cases: |
| self.assertEqual( |
| case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN)) |
| |
| custom_pattern_cases = [ |
| # (<pattern>, <urn>, <expected output>) |
| ( |
| r"^custom:transform:([\w-]+):(\w+)$", |
| "custom:transform:my_transform:v1", |
| "MyTransformV1"), |
| ( |
| r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$", |
| "org.user:some:custom_transform:we_made:external", |
| "SomeCustomTransformWeMade"), |
| ( |
| r"^([\w-]+):user.transforms", |
| "my_eXTErnal:user.transforms", |
| "MyExternal"), |
| (r"^([\w-]+):user.transforms", "my_external:badinput.transforms", None), |
| ] |
| for case in custom_pattern_cases: |
| self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0])) |
| |
| |
| @pytest.mark.uses_io_java_expansion_service |
| @unittest.skipUnless( |
| os.environ.get('EXPANSION_JARS'), |
| "EXPANSION_JARS environment var is not provided, " |
| "indicating that jars have not been built") |
| class ExternalTransformProviderIT(unittest.TestCase): |
| def test_generate_sequence_config_schema_and_description(self): |
| provider = ExternalTransformProvider( |
| BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) |
| |
| self.assertTrue(( |
| 'GenerateSequence', |
| 'beam:schematransform:org.apache.beam:generate_sequence:v1' |
| ) in provider.get_available()) |
| |
| GenerateSequence = provider.get('GenerateSequence') |
| config_schema = GenerateSequence.configuration_schema |
| for param in ['start', 'end', 'rate']: |
| self.assertTrue(param in config_schema) |
| |
| description_substring = ( |
| "Outputs a PCollection of Beam Rows, each " |
| "containing a single INT64") |
| self.assertTrue(description_substring in GenerateSequence.description) |
| |
| def test_run_generate_sequence(self): |
| provider = ExternalTransformProvider( |
| BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) |
| |
| with beam.Pipeline() as p: |
| numbers = p | provider.GenerateSequence( |
| start=0, end=10) | beam.Map(lambda row: row.value) |
| |
| assert_that(numbers, equal_to([i for i in range(10)])) |
| |
| |
| @pytest.mark.xlang_wrapper_generation |
| @unittest.skipUnless( |
| os.environ.get('EXPANSION_JARS'), |
| "EXPANSION_JARS environment var is not provided, " |
| "indicating that jars have not been built") |
| class AutoGenerationScriptIT(unittest.TestCase): |
| """ |
| This class tests the generation and regeneration operations in |
| `gen_xlang_wrappers.py`. |
| """ |
| |
| # tests cases will use GenerateSequence |
| GEN_SEQ_IDENTIFIER = \ |
| 'beam:schematransform:org.apache.beam:generate_sequence:v1' |
| |
| def setUp(self): |
| # import script from top-level sdks/python directory |
| self.sdk_dir = os.path.abspath(dirname(dirname(dirname(__file__)))) |
| spec = importlib.util.spec_from_file_location( |
| 'gen_xlang_wrappers', |
| os.path.join(self.sdk_dir, 'gen_xlang_wrappers.py')) |
| self.script = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(self.script) |
| args = TestPipeline(is_integration_test=True).get_full_options_as_args() |
| runner = PipelineOptions(args).get_all_options()['runner'] |
| if runner and "direct" not in runner.lower(): |
| self.skipTest( |
| "It is sufficient to run this test in the DirectRunner " |
| "test suite only.") |
| |
| self.test_dir_name = 'test_gen_script_%d_%s' % ( |
| int(time.time()), secrets.token_hex(3)) |
| self.test_dir = os.path.join( |
| os.path.abspath(dirname(__file__)), self.test_dir_name) |
| self.service_config_path = os.path.join( |
| self.test_dir, "test_expansion_service_config.yaml") |
| self.transform_config_path = os.path.join( |
| self.test_dir, "test_transform_config.yaml") |
| os.mkdir(self.test_dir) |
| |
| def tearDown(self): |
| shutil.rmtree(self.test_dir, ignore_errors=False) |
| |
| def delete_and_validate(self): |
| self.script.delete_generated_files(self.test_dir) |
| self.assertEqual(len(os.listdir(self.test_dir)), 0) |
| |
| def test_script_fails_with_invalid_destinations(self): |
| expansion_service_config = { |
| "gradle_target": 'sdks:java:io:expansion-service:shadowJar', |
| 'destinations': { |
| 'python': 'apache_beam/some_nonexistent_dir' |
| } |
| } |
| with self.assertRaises(ValueError): |
| self.create_and_check_transforms_config_exists(expansion_service_config) |
| |
| def test_pretty_types(self): |
| types = [ |
| typing.Optional[typing.List[str]], |
| numpy.int16, |
| str, |
| typing.Dict[str, numpy.float64], |
| typing.Optional[typing.Dict[str, typing.List[numpy.int64]]], |
| typing.Dict[int, typing.Optional[str]] |
| ] |
| |
| expected_type_names = [('List[str]', True), ('numpy.int16', False), |
| ('str', False), ('Dict[str, numpy.float64]', False), |
| ('Dict[str, List[numpy.int64]]', True), |
| ('Dict[int, Union[str, NoneType]]', False)] |
| |
| for i in range(len(types)): |
| self.assertEqual( |
| self.script.pretty_type(types[i]), expected_type_names[i]) |
| |
| def create_and_check_transforms_config_exists(self, expansion_service_config): |
| with open(self.service_config_path, 'w') as f: |
| yaml.dump([expansion_service_config], f) |
| |
| self.script.generate_transforms_config( |
| self.service_config_path, self.transform_config_path) |
| self.assertTrue(os.path.exists(self.transform_config_path)) |
| |
| def create_and_validate_transforms_config( |
| self, expansion_service_config, expected_name, expected_destination): |
| self.create_and_check_transforms_config_exists(expansion_service_config) |
| |
| with open(self.transform_config_path) as f: |
| configs = yaml.safe_load(f) |
| gen_seq_config = None |
| for config in configs: |
| if config['identifier'] == self.GEN_SEQ_IDENTIFIER: |
| gen_seq_config = config |
| self.assertIsNotNone(gen_seq_config) |
| self.assertEqual( |
| gen_seq_config['default_service'], |
| expansion_service_config['gradle_target']) |
| self.assertEqual(gen_seq_config['name'], expected_name) |
| self.assertEqual( |
| gen_seq_config['destinations']['python'], expected_destination) |
| self.assertIn("end", gen_seq_config['fields']) |
| self.assertIn("start", gen_seq_config['fields']) |
| self.assertIn("rate", gen_seq_config['fields']) |
| |
| def get_module(self, dest): |
| module_name = dest.replace('apache_beam/', '').replace('/', '_') |
| module = 'apache_beam.transforms.%s.%s' % (self.test_dir_name, module_name) |
| return importlib.import_module(module) |
| |
| def write_wrappers_to_destinations_and_validate( |
| self, destinations: typing.List[str]): |
| """ |
| Generate wrappers from the config path and validate all destinations are |
| included. |
| Then write wrappers to destinations and validate all destination paths |
| exist. |
| |
| :return: Generated wrappers grouped by destination |
| """ |
| grouped_wrappers = self.script.get_wrappers_from_transform_configs( |
| self.transform_config_path) |
| for dest in destinations: |
| self.assertIn(dest, grouped_wrappers) |
| |
| # write to our test directory to avoid messing with other files |
| self.script.write_wrappers_to_destinations( |
| grouped_wrappers, self.test_dir, format_code=False) |
| |
| for dest in destinations: |
| self.assertTrue( |
| os.path.exists( |
| os.path.join( |
| self.test_dir, |
| dest.replace('apache_beam/', '').replace('/', '_') + ".py"))) |
| return grouped_wrappers |
| |
| def test_script_workflow(self): |
| expected_destination = 'apache_beam/transforms' |
| expansion_service_config = { |
| "gradle_target": 'sdks:java:io:expansion-service:shadowJar', |
| 'destinations': { |
| 'python': expected_destination |
| } |
| } |
| |
| self.create_and_validate_transforms_config( |
| expansion_service_config, 'GenerateSequence', expected_destination) |
| grouped_wrappers = self.write_wrappers_to_destinations_and_validate( |
| [expected_destination]) |
| # at least the GenerateSequence wrapper is set to this destination |
| self.assertGreaterEqual(len(grouped_wrappers[expected_destination]), 1) |
| |
| # check the wrapper exists in this destination and has correct properties |
| output_module = self.get_module(expected_destination) |
| self.assertTrue(hasattr(output_module, 'GenerateSequence')) |
| # Since our config isn't skipping any transforms, |
| # it should include these two Kafka IOs as well |
| self.assertTrue(hasattr(output_module, 'KafkaWrite')) |
| self.assertTrue(hasattr(output_module, 'KafkaRead')) |
| self.assertTrue( |
| isinstance(output_module.GenerateSequence(start=0), ExternalTransform)) |
| self.assertEqual( |
| output_module.GenerateSequence.identifier, self.GEN_SEQ_IDENTIFIER) |
| |
| self.delete_and_validate() |
| |
| def test_script_workflow_with_modified_transforms(self): |
| modified_name = 'ModifiedSequence' |
| modified_dest = 'apache_beam/io/gcp' |
| expansion_service_config = { |
| "gradle_target": 'sdks:java:io:expansion-service:shadowJar', |
| 'destinations': { |
| 'python': 'apache_beam/transforms' |
| }, |
| 'transforms': { |
| 'beam:schematransform:org.apache.beam:generate_sequence:v1': { |
| 'name': modified_name, |
| 'destinations': { |
| 'python': modified_dest |
| } |
| } |
| } |
| } |
| |
| self.create_and_validate_transforms_config( |
| expansion_service_config, modified_name, modified_dest) |
| |
| grouped_wrappers = self.write_wrappers_to_destinations_and_validate( |
| [modified_dest]) |
| self.assertIn(modified_name, grouped_wrappers[modified_dest][0]) |
| self.assertEqual(len(grouped_wrappers[modified_dest]), 1) |
| |
| # check the modified wrapper exists in the modified destination |
| # and check it has the correct properties |
| output_module = self.get_module(modified_dest) |
| self.assertTrue( |
| isinstance(output_module.ModifiedSequence(start=0), ExternalTransform)) |
| self.assertEqual( |
| output_module.ModifiedSequence.identifier, self.GEN_SEQ_IDENTIFIER) |
| |
| self.delete_and_validate() |
| |
| def test_script_workflow_with_skipped_transform(self): |
| expansion_service_config = { |
| "gradle_target": 'sdks:java:io:expansion-service:shadowJar', |
| 'destinations': { |
| 'python': f'apache_beam/transforms/{self.test_dir_name}' |
| }, |
| 'skip_transforms': [ |
| 'beam:schematransform:org.apache.beam:generate_sequence:v1' |
| ] |
| } |
| |
| with open(self.service_config_path, 'w') as f: |
| yaml.dump([expansion_service_config], f) |
| |
| self.script.generate_transforms_config( |
| self.service_config_path, self.transform_config_path) |
| |
| # gen sequence shouldn't exist in the transform config |
| with open(self.transform_config_path) as f: |
| transforms = yaml.safe_load(f) |
| gen_seq_config = None |
| for transform in transforms: |
| if transform['identifier'] == self.GEN_SEQ_IDENTIFIER: |
| gen_seq_config = transform |
| self.assertIsNone(gen_seq_config) |
| |
| def test_run_pipeline_with_generated_transform(self): |
| with beam.Pipeline() as p: |
| numbers = ( |
| p | GenerateSequence(start=0, end=10) |
| | beam.Map(lambda row: row.value)) |
| assert_that(numbers, equal_to([i for i in range(10)])) |
| |
| def test_check_standard_external_transforms_config_in_sync(self): |
| """ |
| This test creates a transforms config file and checks it against |
| `sdks/standard_external_transforms.yaml`. Fails if the two configs don't |
| match. |
| |
| Fix by running `./gradlew generateExternalTransformsConfig` and |
| committing the changes. |
| """ |
| sdks_dir = os.path.abspath(dirname(self.sdk_dir)) |
| self.script.generate_transforms_config( |
| os.path.join(sdks_dir, 'standard_expansion_services.yaml'), |
| self.transform_config_path) |
| with open(self.transform_config_path) as f: |
| test_config = yaml.safe_load(f) |
| with open(os.path.join(sdks_dir, 'standard_external_transforms.yaml'), |
| 'r') as f: |
| standard_config = yaml.safe_load(f) |
| |
| self.assertEqual( |
| test_config, |
| standard_config, |
| "The standard xlang transforms config file " |
| "\"standard_external_transforms.yaml\" is out of sync! Please update" |
| "by running './gradlew generateExternalTransformsConfig'" |
| "and committing the changes.") |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |