|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | """ | 
|  | Generates Python proto modules and grpc stubs for Beam protos. | 
|  | """ | 
|  | import argparse | 
|  | import contextlib | 
|  | import glob | 
|  | import inspect | 
|  | import logging | 
|  | import os | 
|  | import platform | 
|  | import re | 
|  | import shutil | 
|  | import sys | 
|  | from collections import defaultdict | 
|  | from importlib import import_module | 
|  |  | 
|  | import pkg_resources | 
|  |  | 
|  | LOG = logging.getLogger() | 
|  | LOG.setLevel(logging.INFO) | 
|  |  | 
|  | LICENSE_HEADER = """ | 
|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  | """ | 
|  |  | 
|  | NO_PROMISES_NOTICE = """ | 
|  | \"\"\" | 
|  | For internal use only; no backwards-compatibility guarantees. | 
|  | Automatically generated when running python -m build. | 
|  | \"\"\" | 
|  | """ | 
|  |  | 
|  |  | 
|  | def clean_path(path): | 
|  | return os.path.realpath(os.path.abspath(path)) | 
|  |  | 
|  |  | 
|  | # These paths are relative to the project root | 
|  | BEAM_PROTO_PATHS = [ | 
|  | os.path.join('model', 'pipeline', 'src', 'main', 'proto'), | 
|  | os.path.join('model', 'job-management', 'src', 'main', 'proto'), | 
|  | os.path.join('model', 'fn-execution', 'src', 'main', 'proto'), | 
|  | os.path.join('model', 'interactive', 'src', 'main', 'proto'), | 
|  | ] | 
|  |  | 
|  | PYTHON_SDK_ROOT = os.path.dirname(clean_path(__file__)) | 
|  | PROJECT_ROOT = clean_path(os.path.join(PYTHON_SDK_ROOT, '..', '..')) | 
|  | PYTHON_OUTPUT_PATH = os.path.join( | 
|  | PYTHON_SDK_ROOT, 'apache_beam', 'portability', 'api') | 
|  |  | 
|  | MODEL_RESOURCES = [ | 
|  | os.path.normpath(( | 
|  | 'model/fn-execution/src/main/resources/org/' | 
|  | 'apache/beam/model/fnexecution/v1/standard_coders.yaml')), | 
|  | ] | 
|  |  | 
|  |  | 
|  | class PythonPath(object): | 
|  | def __init__(self, path: str, front: bool = False): | 
|  | self._path = path | 
|  | self._front = front | 
|  |  | 
|  | def __enter__(self): | 
|  | if not self._path: | 
|  | return | 
|  |  | 
|  | self._sys_path = sys.path.copy() | 
|  | if self._front: | 
|  | sys.path.insert(0, self._path) | 
|  | else: | 
|  | sys.path.append(self._path) | 
|  |  | 
|  | def __exit__(self, exc_type, exc_val, exc_tb): | 
|  | if not self._path: | 
|  | return | 
|  |  | 
|  | sys.path = self._sys_path | 
|  |  | 
|  |  | 
|  | def generate_urn_files(out_dir, api_path): | 
|  | """ | 
|  | Create python files with statically defined URN constants. | 
|  |  | 
|  | Creates a <proto>_pb2_urn.py file for each <proto>_pb2.py file that contains | 
|  | an enum type. | 
|  |  | 
|  | This works by importing each api.<proto>_pb2 module created by `protoc`, | 
|  | inspecting the module's contents, and generating a new side-car urn module. | 
|  | This is executed at build time rather than dynamically on import to ensure | 
|  | that it is compatible with static type checkers like mypy. | 
|  | """ | 
|  | from google.protobuf import message | 
|  | from google.protobuf.internal import api_implementation | 
|  | if api_implementation.Type() == 'python': | 
|  | from google.protobuf.internal import containers | 
|  | repeated_types = ( | 
|  | list, | 
|  | containers.RepeatedScalarFieldContainer, | 
|  | containers.RepeatedCompositeFieldContainer) | 
|  | elif api_implementation.Type() == 'upb': | 
|  | from google._upb import _message | 
|  | repeated_types = ( | 
|  | list, | 
|  | _message.RepeatedScalarContainer, | 
|  | _message.RepeatedCompositeContainer) | 
|  | elif api_implementation.Type() == 'cpp': | 
|  | from google.protobuf.pyext import _message | 
|  | repeated_types = ( | 
|  | list, | 
|  | _message.RepeatedScalarContainer, | 
|  | _message.RepeatedCompositeContainer) | 
|  | else: | 
|  | raise TypeError( | 
|  | "Unknown proto implementation: " + api_implementation.Type()) | 
|  |  | 
|  | class Context(object): | 
|  | INDENT = '  ' | 
|  | CAP_SPLIT = re.compile('([A-Z][^A-Z]*|^[a-z]+)') | 
|  |  | 
|  | def __init__(self, indent=0): | 
|  | self.lines = [] | 
|  | self.imports = set() | 
|  | self.empty_types = set() | 
|  | self._indent = indent | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def indent(self): | 
|  | self._indent += 1 | 
|  | yield | 
|  | self._indent -= 1 | 
|  |  | 
|  | def prepend(self, s): | 
|  | if s: | 
|  | self.lines.insert(0, (self.INDENT * self._indent) + s + '\n') | 
|  | else: | 
|  | self.lines.insert(0, '\n') | 
|  |  | 
|  | def line(self, s): | 
|  | if s: | 
|  | self.lines.append((self.INDENT * self._indent) + s + '\n') | 
|  | else: | 
|  | self.lines.append('\n') | 
|  |  | 
|  | def import_type(self, typ): | 
|  | modname = typ.__module__ | 
|  | if modname in ('__builtin__', 'builtin'): | 
|  | return typ.__name__ | 
|  | else: | 
|  | self.imports.add(modname) | 
|  | _, modname = modname.rsplit('.', 1) | 
|  | return modname + '.' + typ.__name__ | 
|  |  | 
|  | @staticmethod | 
|  | def is_message_type(obj): | 
|  | return isinstance(obj, type) and \ | 
|  | issubclass(obj, message.Message) | 
|  |  | 
|  | @staticmethod | 
|  | def is_enum_type(obj): | 
|  | return type(obj).__name__ == 'EnumTypeWrapper' | 
|  |  | 
|  | def python_repr(self, obj): | 
|  | if isinstance(obj, message.Message): | 
|  | return self.message_repr(obj) | 
|  | elif isinstance(obj, repeated_types): | 
|  | return '[%s]' % ', '.join(self.python_repr(x) for x in obj) | 
|  | else: | 
|  | return repr(obj) | 
|  |  | 
|  | def empty_type(self, typ): | 
|  | name = ( | 
|  | 'EMPTY_' + | 
|  | '_'.join(x.upper() for x in self.CAP_SPLIT.findall(typ.__name__))) | 
|  | self.empty_types.add('%s = %s()' % (name, self.import_type(typ))) | 
|  | return name | 
|  |  | 
|  | def message_repr(self, msg): | 
|  | parts = [] | 
|  | for field, value in msg.ListFields(): | 
|  | parts.append('%s=%s' % (field.name, self.python_repr(value))) | 
|  | if parts: | 
|  | return '%s(%s)' % (self.import_type(type(msg)), ', '.join(parts)) | 
|  | else: | 
|  | return self.empty_type(type(msg)) | 
|  |  | 
|  | def write_enum(self, enum_name, enum, indent): | 
|  | ctx = Context(indent=indent) | 
|  | with ctx.indent(): | 
|  | for enum_value_name in enum.values_by_name: | 
|  | enum_value_descriptor = enum.values_by_name[enum_value_name] | 
|  | extensions = enum_value_descriptor.GetOptions().Extensions | 
|  | prop = ( | 
|  | extensions[beam_runner_api_pb2.beam_urn], | 
|  | extensions[beam_runner_api_pb2.beam_constant], | 
|  | extensions[metrics_pb2.monitoring_info_spec], | 
|  | extensions[metrics_pb2.label_props], | 
|  | ) | 
|  | reprs = [self.python_repr(x) for x in prop] | 
|  | if all(x == "''" or x.startswith('EMPTY_') for x in reprs): | 
|  | continue | 
|  | ctx.line( | 
|  | '%s = PropertiesFromEnumValue(%s)' % | 
|  | (enum_value_name, ', '.join(self.python_repr(x) for x in prop))) | 
|  |  | 
|  | if ctx.lines: | 
|  | ctx.prepend('class %s(object):' % enum_name) | 
|  | ctx.prepend('') | 
|  | ctx.line('') | 
|  | return ctx.lines | 
|  |  | 
|  | def write_message(self, message_name, message, indent=0): | 
|  | ctx = Context(indent=indent) | 
|  |  | 
|  | with ctx.indent(): | 
|  | for obj_name, obj in inspect.getmembers(message): | 
|  | if obj_name == 'DESCRIPTOR': | 
|  | for enum_name in obj.enum_types_by_name: | 
|  | enum = obj.enum_types_by_name[enum_name] | 
|  | ctx.lines += self.write_enum(enum_name, enum, ctx._indent) | 
|  |  | 
|  | if ctx.lines: | 
|  | ctx.prepend('class %s(object):' % message_name) | 
|  | ctx.prepend('') | 
|  | return ctx.lines | 
|  |  | 
|  | pb2_files = list(glob.glob(os.path.join(out_dir, '*_pb2.py'))) | 
|  |  | 
|  | with PythonPath(os.path.dirname(api_path), front=True): | 
|  | beam_runner_api_pb2 = import_module( | 
|  | 'api.org.apache.beam.model.pipeline.v1.beam_runner_api_pb2') | 
|  | metrics_pb2 = import_module( | 
|  | 'api.org.apache.beam.model.pipeline.v1.metrics_pb2') | 
|  |  | 
|  | for pb2_file in pb2_files: | 
|  | modname = os.path.splitext(pb2_file)[0] | 
|  | out_file = modname + '_urns.py' | 
|  | api_start_idx = modname.index(os.path.sep + 'api' + os.path.sep) | 
|  | import_path = modname[api_start_idx + 1:].replace(os.path.sep, '.') | 
|  | mod = import_module(import_path) | 
|  |  | 
|  | ctx = Context() | 
|  | for obj_name, obj in inspect.getmembers(mod): | 
|  | if ctx.is_message_type(obj): | 
|  | ctx.lines += ctx.write_message(obj_name, obj) | 
|  |  | 
|  | if ctx.lines: | 
|  | for line in reversed(sorted(ctx.empty_types)): | 
|  | ctx.prepend(line) | 
|  |  | 
|  | for modname in reversed(sorted(ctx.imports)): | 
|  | pkg, target = modname.rsplit('.', 1) | 
|  | rel_import = build_relative_import(api_path, pkg, out_file) | 
|  | ctx.prepend('from %s import %s' % (rel_import, target)) | 
|  |  | 
|  | rel_import = build_relative_import( | 
|  | os.path.dirname(api_path), 'utils', out_file) | 
|  | ctx.prepend('from %s import PropertiesFromEnumValue' % rel_import) | 
|  |  | 
|  | LOG.info("Writing urn stubs: %s" % out_file) | 
|  | with open(out_file, 'w') as f: | 
|  | f.writelines(ctx.lines) | 
|  |  | 
|  |  | 
|  | def _find_protoc_gen_mypy(): | 
|  | # NOTE: this shouldn't be necessary if the virtualenv's environment | 
|  | #  is passed to tasks below it, since protoc will search the PATH itself | 
|  | fname = 'protoc-gen-mypy' | 
|  | if platform.system() == 'Windows': | 
|  | fname += ".exe" | 
|  |  | 
|  | pathstr = os.environ.get('PATH') | 
|  | search_paths = pathstr.split(os.pathsep) if pathstr else [] | 
|  | # should typically be installed into the venv's bin dir | 
|  | search_paths.insert(0, os.path.dirname(sys.executable)) | 
|  | for path in search_paths: | 
|  | fullpath = os.path.join(path, fname) | 
|  | if os.path.exists(fullpath): | 
|  | LOG.info('Found protoc_gen_mypy at %s' % fullpath) | 
|  | return fullpath | 
|  | raise RuntimeError( | 
|  | "Could not find %s in %s" % (fname, ', '.join(search_paths))) | 
|  |  | 
|  |  | 
|  | def find_by_ext(root_dir, ext): | 
|  | for root, _, files in os.walk(root_dir): | 
|  | for file in files: | 
|  | if file.endswith(ext): | 
|  | yield clean_path(os.path.join(root, file)) | 
|  |  | 
|  | def build_relative_import(root_path, import_path, start_file_path): | 
|  | tail_path = import_path.replace('.', os.path.sep) | 
|  | source_path = os.path.join(root_path, tail_path) | 
|  |  | 
|  | is_module = os.path.isfile(source_path + '.py') | 
|  | if is_module: | 
|  | source_path = os.path.dirname(source_path) | 
|  |  | 
|  | rel_path = os.path.relpath( | 
|  | source_path, start=os.path.dirname(start_file_path)) | 
|  |  | 
|  | if rel_path == '.': | 
|  | if is_module: | 
|  | rel_path += os.path.basename(tail_path) | 
|  |  | 
|  | return rel_path | 
|  |  | 
|  | if rel_path.endswith('..'): | 
|  | rel_path += os.path.sep | 
|  |  | 
|  | # In a path that looks like ../../../foo, every double dot | 
|  | # after the right most double dot needs to be collapsed to | 
|  | # a single dot to look like ././../foo to which we can convert | 
|  | # to ....foo for the proper relative import. | 
|  | first_half_idx = rel_path.rfind('..' + os.path.sep) | 
|  | if first_half_idx == 0: | 
|  | return rel_path.replace(os.path.sep, '') | 
|  |  | 
|  | first_half = rel_path[:first_half_idx].replace('..', '.') | 
|  | final_import = first_half.replace(os.path.sep, '') + '..' + \ | 
|  | rel_path[first_half_idx+3:].replace(os.path.sep, '.') | 
|  |  | 
|  | if is_module: | 
|  | if final_import.count('.') == len(final_import): | 
|  | return final_import + os.path.basename(tail_path) | 
|  |  | 
|  | return final_import + '.{}'.format(os.path.basename(tail_path)) | 
|  |  | 
|  | return final_import | 
|  |  | 
|  |  | 
|  | def generate_init_files_lite(api_root): | 
|  | proto_root = os.path.join(api_root, 'org') | 
|  | for root, _, _ in os.walk(proto_root): | 
|  | init_file = os.path.join(root, '__init__.py') | 
|  | with open(init_file, 'w+'): | 
|  | pass | 
|  |  | 
|  |  | 
|  | def generate_init_files_full(api_root): | 
|  | proto_root = os.path.join(api_root, 'org') | 
|  | api_module_root = os.path.join(api_root, '__init__.py') | 
|  | modules = defaultdict(list) | 
|  |  | 
|  | for root, _, files in os.walk(proto_root): | 
|  | init_file = os.path.join(root, '__init__.py') | 
|  | with open(init_file, 'w+') as f: | 
|  | f.write(LICENSE_HEADER.lstrip()) | 
|  | for file in files: | 
|  | if not file.endswith('.py') or file == '__init__.py': | 
|  | continue | 
|  | module_name = file.split('.')[0] | 
|  | f.write('from . import {}\n'.format(module_name)) | 
|  | modules[root].append(module_name) | 
|  |  | 
|  | with open(api_module_root, 'w+') as f: | 
|  | f.write(LICENSE_HEADER.lstrip()) | 
|  | f.write(NO_PROMISES_NOTICE.lstrip()) | 
|  | remaining_lines = [] | 
|  |  | 
|  | duplicate_modules = {} | 
|  | for module_root, modules in modules.items(): | 
|  | import_path = os.path.relpath(module_root, | 
|  | api_root).replace(os.path.sep, '.') | 
|  | import_root, imported_module = import_path.rsplit('.', 1) | 
|  |  | 
|  | if imported_module not in duplicate_modules: | 
|  | f.write('from .{} import {}\n'.format(import_root, imported_module)) | 
|  | duplicate_modules[imported_module] = 1 | 
|  | else: | 
|  | duplicate_modules[imported_module] += 1 | 
|  | module_alias = '{}_{}'.format( | 
|  | imported_module, duplicate_modules[imported_module]) | 
|  | f.write( | 
|  | 'from .{} import {} as {}\n'.format( | 
|  | import_root, imported_module, module_alias)) | 
|  | imported_module = module_alias | 
|  |  | 
|  | for module in modules: | 
|  | remaining_lines.append( | 
|  | '{module} = {}.{module}\n'.format(imported_module, module=module)) | 
|  | f.write('\n') | 
|  | f.writelines(remaining_lines) | 
|  |  | 
|  |  | 
|  | def generate_proto_files(force=False): | 
|  | """ | 
|  | Will compile proto files for python. If force is not true, then several | 
|  | heuristics are used to determine whether a compilation is necessary. If | 
|  | a compilation is not necessary, no compilation will be performed. | 
|  | :param force: Whether to force a recompilation of the proto files. | 
|  | """ | 
|  | proto_dirs = [ | 
|  | clean_path(os.path.join(PROJECT_ROOT, path)) for path in BEAM_PROTO_PATHS | 
|  | ] | 
|  | proto_files = [ | 
|  | proto_file for d in proto_dirs for proto_file in find_by_ext(d, '.proto') | 
|  | ] | 
|  |  | 
|  | out_files = list(find_by_ext(PYTHON_OUTPUT_PATH, '_pb2.py')) | 
|  |  | 
|  | if out_files and not proto_files and not force: | 
|  | # We have out_files but no protos; assume they're up-to-date. | 
|  | # This is actually the common case (e.g. installation from an sdist). | 
|  | LOG.info('No proto files; using existing generated files.') | 
|  | return | 
|  |  | 
|  | elif not out_files and not proto_files: | 
|  | model = os.path.join(PROJECT_ROOT, 'model') | 
|  | if os.path.exists(model): | 
|  | error_msg = 'No proto files found in %s.' % proto_dirs | 
|  | else: | 
|  | error_msg = 'Not in apache git tree, unable to find proto definitions.' | 
|  |  | 
|  | raise RuntimeError(error_msg) | 
|  |  | 
|  | if force: | 
|  | regenerate_reason = 'forced' | 
|  | elif not out_files: | 
|  | regenerate_reason = 'no output files' | 
|  | elif len(out_files) < len(proto_files): | 
|  | regenerate_reason = 'not enough output files' | 
|  | elif (min(os.path.getmtime(path) for path in out_files) <= max( | 
|  | os.path.getmtime(path) | 
|  | for path in proto_files + [os.path.realpath(__file__)])): | 
|  | regenerate_reason = 'output files are out-of-date' | 
|  | elif len(out_files) > len(proto_files): | 
|  | regenerate_reason = 'output files without corresponding .proto files' | 
|  | # too many output files: probably due to switching between git branches. | 
|  | # remove them so they don't trigger constant regeneration. | 
|  | for out_file in out_files: | 
|  | os.remove(out_file) | 
|  | else: | 
|  | regenerate_reason = '' | 
|  |  | 
|  | if not regenerate_reason: | 
|  | LOG.info('Skipping proto regeneration: all files up to date') | 
|  | return | 
|  |  | 
|  | shutil.rmtree(PYTHON_OUTPUT_PATH, ignore_errors=True) | 
|  | if not os.path.exists(PYTHON_OUTPUT_PATH): | 
|  | os.mkdir(PYTHON_OUTPUT_PATH) | 
|  |  | 
|  | protoc_gen_mypy = _find_protoc_gen_mypy() | 
|  | from grpc_tools import protoc | 
|  | builtin_protos = pkg_resources.resource_filename('grpc_tools', '_proto') | 
|  | args = ( | 
|  | [sys.executable] +  # expecting to be called from command line | 
|  | ['--proto_path=%s' % builtin_protos] + | 
|  | ['--proto_path=%s' % d | 
|  | for d in proto_dirs] + ['--python_out=%s' % PYTHON_OUTPUT_PATH] + | 
|  | ['--plugin=protoc-gen-mypy=%s' % protoc_gen_mypy] + | 
|  | # new version of mypy-protobuf converts None to zero default value | 
|  | # and remove Optional from the param type annotation. This causes | 
|  | # some mypy errors. So to mitigate and fall back to old behavior, | 
|  | # use `relax_strict_optional_primitives` flag. more at | 
|  | # https://github.com/nipunn1313/mypy-protobuf/tree/main#relax_strict_optional_primitives # pylint:disable=line-too-long | 
|  | ['--mypy_out=relax_strict_optional_primitives:%s' % PYTHON_OUTPUT_PATH | 
|  | ] + | 
|  | # TODO(robertwb): Remove the prefix once it's the default. | 
|  | ['--grpc_python_out=grpc_2_0:%s' % PYTHON_OUTPUT_PATH] + proto_files) | 
|  |  | 
|  | LOG.info('Regenerating Python proto definitions (%s).' % regenerate_reason) | 
|  | ret_code = protoc.main(args) | 
|  | if ret_code: | 
|  | raise RuntimeError( | 
|  | 'Protoc returned non-zero status (see logs for details): ' | 
|  | '%s' % ret_code) | 
|  |  | 
|  | # copy resource files | 
|  | for path in MODEL_RESOURCES: | 
|  | shutil.copy2(os.path.join(PROJECT_ROOT, path), PYTHON_OUTPUT_PATH) | 
|  |  | 
|  | proto_packages = set() | 
|  | # see: https://github.com/protocolbuffers/protobuf/issues/1491 | 
|  | # force relative import paths for proto files | 
|  | compiled_import_re = re.compile('^from (.*) import (.*)$') | 
|  | for file_path in find_by_ext(PYTHON_OUTPUT_PATH, | 
|  | ('_pb2.py', '_pb2_grpc.py', '_pb2.pyi')): | 
|  | proto_packages.add(os.path.dirname(file_path)) | 
|  | lines = [] | 
|  | with open(file_path, encoding='utf-8') as f: | 
|  | for line in f: | 
|  | match_obj = compiled_import_re.match(line) | 
|  | if match_obj and \ | 
|  | match_obj.group(1).startswith('org.apache.beam.model'): | 
|  | new_import = build_relative_import( | 
|  | PYTHON_OUTPUT_PATH, match_obj.group(1), file_path) | 
|  | line = 'from %s import %s\n' % (new_import, match_obj.group(2)) | 
|  |  | 
|  | lines.append(line) | 
|  |  | 
|  | with open(file_path, 'w') as f: | 
|  | f.writelines(lines) | 
|  |  | 
|  | generate_init_files_lite(PYTHON_OUTPUT_PATH) | 
|  | for proto_package in proto_packages: | 
|  | generate_urn_files(proto_package, PYTHON_OUTPUT_PATH) | 
|  |  | 
|  | generate_init_files_full(PYTHON_OUTPUT_PATH) | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | parser = argparse.ArgumentParser() | 
|  | parser.add_argument('--no-force', dest='force', action='store_false') | 
|  | args = parser.parse_args() | 
|  | generate_proto_files(force=args.force) |