blob: 94d80c8d263b921c8c0b8f4698c9c64c2d6cd6f6 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Generates Python proto modules and grpc stubs for Beam protos.
"""
import contextlib
import glob
import inspect
import logging
import os
import platform
import re
import shutil
import subprocess
import sys
import time
from collections import defaultdict
from importlib import import_module
import pkg_resources
LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
LICENSE_HEADER = """
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
NO_PROMISES_NOTICE = """
\"\"\"
For internal use only; no backwards-compatibility guarantees.
Automatically generated when running setup.py sdist or build[_py].
\"\"\"
"""
def clean_path(path):
return os.path.realpath(os.path.abspath(path))
# These paths are relative to the project root
BEAM_PROTO_PATHS = [
os.path.join('model', 'pipeline', 'src', 'main', 'proto'),
os.path.join('model', 'job-management', 'src', 'main', 'proto'),
os.path.join('model', 'fn-execution', 'src', 'main', 'proto'),
os.path.join('model', 'interactive', 'src', 'main', 'proto'),
]
PYTHON_SDK_ROOT = os.path.dirname(clean_path(__file__))
PROJECT_ROOT = clean_path(os.path.join(PYTHON_SDK_ROOT, '..', '..'))
PYTHON_OUTPUT_PATH = os.path.join(
PYTHON_SDK_ROOT, 'apache_beam', 'portability', 'api')
MODEL_RESOURCES = [
os.path.normpath((
'model/fn-execution/src/main/resources/org/'
'apache/beam/model/fnexecution/v1/standard_coders.yaml')),
]
class PythonPath(object):
def __init__(self, path: str, front: bool = False):
self._path = path
self._front = front
def __enter__(self):
if not self._path:
return
self._sys_path = sys.path.copy()
if self._front:
sys.path.insert(0, self._path)
else:
sys.path.append(self._path)
def __exit__(self, exc_type, exc_val, exc_tb):
if not self._path:
return
sys.path = self._sys_path
def generate_urn_files(out_dir, api_path):
"""
Create python files with statically defined URN constants.
Creates a <proto>_pb2_urn.py file for each <proto>_pb2.py file that contains
an enum type.
This works by importing each api.<proto>_pb2 module created by `protoc`,
inspecting the module's contents, and generating a new side-car urn module.
This is executed at build time rather than dynamically on import to ensure
that it is compatible with static type checkers like mypy.
"""
from google.protobuf import message
from google.protobuf.internal import api_implementation
if api_implementation.Type() == 'python':
from google.protobuf.internal import containers
repeated_types = (
list,
containers.RepeatedScalarFieldContainer,
containers.RepeatedCompositeFieldContainer)
elif api_implementation.Type() == 'upb':
from google._upb import _message
repeated_types = (
list,
_message.RepeatedScalarContainer,
_message.RepeatedCompositeContainer)
elif api_implementation.Type() == 'cpp':
from google.protobuf.pyext import _message
repeated_types = (
list,
_message.RepeatedScalarContainer,
_message.RepeatedCompositeContainer)
else:
raise TypeError(
"Unknown proto implementation: " + api_implementation.Type())
class Context(object):
INDENT = ' '
CAP_SPLIT = re.compile('([A-Z][^A-Z]*|^[a-z]+)')
def __init__(self, indent=0):
self.lines = []
self.imports = set()
self.empty_types = set()
self._indent = indent
@contextlib.contextmanager
def indent(self):
self._indent += 1
yield
self._indent -= 1
def prepend(self, s):
if s:
self.lines.insert(0, (self.INDENT * self._indent) + s + '\n')
else:
self.lines.insert(0, '\n')
def line(self, s):
if s:
self.lines.append((self.INDENT * self._indent) + s + '\n')
else:
self.lines.append('\n')
def import_type(self, typ):
modname = typ.__module__
if modname in ('__builtin__', 'builtin'):
return typ.__name__
else:
self.imports.add(modname)
_, modname = modname.rsplit('.', 1)
return modname + '.' + typ.__name__
@staticmethod
def is_message_type(obj):
return isinstance(obj, type) and \
issubclass(obj, message.Message)
@staticmethod
def is_enum_type(obj):
return type(obj).__name__ == 'EnumTypeWrapper'
def python_repr(self, obj):
if isinstance(obj, message.Message):
return self.message_repr(obj)
elif isinstance(obj, repeated_types):
return '[%s]' % ', '.join(self.python_repr(x) for x in obj)
else:
return repr(obj)
def empty_type(self, typ):
name = (
'EMPTY_' +
'_'.join(x.upper() for x in self.CAP_SPLIT.findall(typ.__name__)))
self.empty_types.add('%s = %s()' % (name, self.import_type(typ)))
return name
def message_repr(self, msg):
parts = []
for field, value in msg.ListFields():
parts.append('%s=%s' % (field.name, self.python_repr(value)))
if parts:
return '%s(%s)' % (self.import_type(type(msg)), ', '.join(parts))
else:
return self.empty_type(type(msg))
def write_enum(self, enum_name, enum, indent):
ctx = Context(indent=indent)
with ctx.indent():
for enum_value_name in enum.values_by_name:
enum_value_descriptor = enum.values_by_name[enum_value_name]
extensions = enum_value_descriptor.GetOptions().Extensions
prop = (
extensions[beam_runner_api_pb2.beam_urn],
extensions[beam_runner_api_pb2.beam_constant],
extensions[metrics_pb2.monitoring_info_spec],
extensions[metrics_pb2.label_props],
)
reprs = [self.python_repr(x) for x in prop]
if all(x == "''" or x.startswith('EMPTY_') for x in reprs):
continue
ctx.line(
'%s = PropertiesFromEnumValue(%s)' %
(enum_value_name, ', '.join(self.python_repr(x) for x in prop)))
if ctx.lines:
ctx.prepend('class %s(object):' % enum_name)
ctx.prepend('')
ctx.line('')
return ctx.lines
def write_message(self, message_name, message, indent=0):
ctx = Context(indent=indent)
with ctx.indent():
for obj_name, obj in inspect.getmembers(message):
if obj_name == 'DESCRIPTOR':
for enum_name in obj.enum_types_by_name:
enum = obj.enum_types_by_name[enum_name]
ctx.lines += self.write_enum(enum_name, enum, ctx._indent)
if ctx.lines:
ctx.prepend('class %s(object):' % message_name)
ctx.prepend('')
return ctx.lines
pb2_files = list(glob.glob(os.path.join(out_dir, '*_pb2.py')))
with PythonPath(os.path.dirname(api_path), front=True):
beam_runner_api_pb2 = import_module(
'api.org.apache.beam.model.pipeline.v1.beam_runner_api_pb2')
metrics_pb2 = import_module(
'api.org.apache.beam.model.pipeline.v1.metrics_pb2')
for pb2_file in pb2_files:
modname = os.path.splitext(pb2_file)[0]
out_file = modname + '_urns.py'
api_start_idx = modname.index(os.path.sep + 'api' + os.path.sep)
import_path = modname[api_start_idx + 1:].replace(os.path.sep, '.')
mod = import_module(import_path)
ctx = Context()
for obj_name, obj in inspect.getmembers(mod):
if ctx.is_message_type(obj):
ctx.lines += ctx.write_message(obj_name, obj)
if ctx.lines:
for line in reversed(sorted(ctx.empty_types)):
ctx.prepend(line)
for modname in reversed(sorted(ctx.imports)):
pkg, target = modname.rsplit('.', 1)
rel_import = build_relative_import(api_path, pkg, out_file)
ctx.prepend('from %s import %s' % (rel_import, target))
rel_import = build_relative_import(
os.path.dirname(api_path), 'utils', out_file)
ctx.prepend('from %s import PropertiesFromEnumValue' % rel_import)
LOG.info("Writing urn stubs: %s" % out_file)
with open(out_file, 'w') as f:
f.writelines(ctx.lines)
def _find_protoc_gen_mypy():
# NOTE: this shouldn't be necessary if the virtualenv's environment
# is passed to tasks below it, since protoc will search the PATH itself
fname = 'protoc-gen-mypy'
if platform.system() == 'Windows':
fname += ".exe"
pathstr = os.environ.get('PATH')
search_paths = pathstr.split(os.pathsep) if pathstr else []
# should typically be installed into the venv's bin dir
search_paths.insert(0, os.path.dirname(sys.executable))
for path in search_paths:
fullpath = os.path.join(path, fname)
if os.path.exists(fullpath):
LOG.info('Found protoc_gen_mypy at %s' % fullpath)
return fullpath
raise RuntimeError(
"Could not find %s in %s" % (fname, ', '.join(search_paths)))
def find_by_ext(root_dir, ext):
for root, _, files in os.walk(root_dir):
for file in files:
if file.endswith(ext):
yield clean_path(os.path.join(root, file))
def ensure_grpcio_exists():
try:
from grpc_tools import protoc # pylint: disable=unused-import
except ImportError:
return _install_grpcio_tools()
def _install_grpcio_tools():
"""
Though wheels are available for grpcio-tools, setup_requires uses
easy_install which doesn't understand them. This means that it is
compiled from scratch (which is expensive as it compiles the full
protoc compiler). Instead, we attempt to install a wheel in a temporary
directory and add it to the path as needed.
See https://github.com/pypa/setuptools/issues/377
"""
install_path = os.path.join(PYTHON_SDK_ROOT, '.eggs', 'grpcio-wheels')
logging.warning('Installing grpcio-tools into %s', install_path)
start = time.time()
subprocess.check_call([
sys.executable,
'-m',
'pip',
'install',
'--target',
install_path,
'--upgrade',
'-r',
os.path.join(PYTHON_SDK_ROOT, 'build-requirements.txt')
])
logging.warning(
'Installing grpcio-tools took %0.2f seconds.', time.time() - start)
return install_path
def build_relative_import(root_path, import_path, start_file_path):
tail_path = import_path.replace('.', os.path.sep)
source_path = os.path.join(root_path, tail_path)
is_module = os.path.isfile(source_path + '.py')
if is_module:
source_path = os.path.dirname(source_path)
rel_path = os.path.relpath(
source_path, start=os.path.dirname(start_file_path))
if rel_path == '.':
if is_module:
rel_path += os.path.basename(tail_path)
return rel_path
if rel_path.endswith('..'):
rel_path += os.path.sep
# In a path that looks like ../../../foo, every double dot
# after the right most double dot needs to be collapsed to
# a single dot to look like ././../foo to which we can convert
# to ....foo for the proper relative import.
first_half_idx = rel_path.rfind('..' + os.path.sep)
if first_half_idx == 0:
return rel_path.replace(os.path.sep, '')
first_half = rel_path[:first_half_idx].replace('..', '.')
final_import = first_half.replace(os.path.sep, '') + '..' + \
rel_path[first_half_idx+3:].replace(os.path.sep, '.')
if is_module:
if final_import.count('.') == len(final_import):
return final_import + os.path.basename(tail_path)
return final_import + '.{}'.format(os.path.basename(tail_path))
return final_import
def generate_init_files_lite(api_root):
proto_root = os.path.join(api_root, 'org')
for root, _, _ in os.walk(proto_root):
init_file = os.path.join(root, '__init__.py')
with open(init_file, 'w+'):
pass
def generate_init_files_full(api_root):
proto_root = os.path.join(api_root, 'org')
api_module_root = os.path.join(api_root, '__init__.py')
modules = defaultdict(list)
for root, _, files in os.walk(proto_root):
init_file = os.path.join(root, '__init__.py')
with open(init_file, 'w+') as f:
f.write(LICENSE_HEADER.lstrip())
for file in files:
if not file.endswith('.py') or file == '__init__.py':
continue
module_name = file.split('.')[0]
f.write('from . import {}\n'.format(module_name))
modules[root].append(module_name)
with open(api_module_root, 'w+') as f:
f.write(LICENSE_HEADER.lstrip())
f.write(NO_PROMISES_NOTICE.lstrip())
remaining_lines = []
duplicate_modules = {}
for module_root, modules in modules.items():
import_path = os.path.relpath(module_root,
api_root).replace(os.path.sep, '.')
import_root, imported_module = import_path.rsplit('.', 1)
if imported_module not in duplicate_modules:
f.write('from .{} import {}\n'.format(import_root, imported_module))
duplicate_modules[imported_module] = 1
else:
duplicate_modules[imported_module] += 1
module_alias = '{}_{}'.format(
imported_module, duplicate_modules[imported_module])
f.write(
'from .{} import {} as {}\n'.format(
import_root, imported_module, module_alias))
imported_module = module_alias
for module in modules:
remaining_lines.append(
'{module} = {}.{module}\n'.format(imported_module, module=module))
f.write('\n')
f.writelines(remaining_lines)
def generate_proto_files(force=False):
"""
Will compile proto files for python. If force is not true, then several
heuristics are used to determine whether a compilation is necessary. If
a compilation is not necessary, no compilation will be performed.
:param force: Whether to force a recompilation of the proto files.
"""
proto_dirs = [
clean_path(os.path.join(PROJECT_ROOT, path)) for path in BEAM_PROTO_PATHS
]
proto_files = [
proto_file for d in proto_dirs for proto_file in find_by_ext(d, '.proto')
]
out_files = list(find_by_ext(PYTHON_OUTPUT_PATH, '_pb2.py'))
if out_files and not proto_files and not force:
# We have out_files but no protos; assume they're up-to-date.
# This is actually the common case (e.g. installation from an sdist).
LOG.info('No proto files; using existing generated files.')
return
elif not out_files and not proto_files:
model = os.path.join(PROJECT_ROOT, 'model')
if os.path.exists(model):
error_msg = 'No proto files found in %s.' % proto_dirs
else:
error_msg = 'Not in apache git tree, unable to find proto definitions.'
raise RuntimeError(error_msg)
if force:
regenerate_reason = 'forced'
elif not out_files:
regenerate_reason = 'no output files'
elif len(out_files) < len(proto_files):
regenerate_reason = 'not enough output files'
elif (min(os.path.getmtime(path) for path in out_files) <= max(
os.path.getmtime(path)
for path in proto_files + [os.path.realpath(__file__)])):
regenerate_reason = 'output files are out-of-date'
elif len(out_files) > len(proto_files):
regenerate_reason = 'output files without corresponding .proto files'
# too many output files: probably due to switching between git branches.
# remove them so they don't trigger constant regeneration.
for out_file in out_files:
os.remove(out_file)
else:
regenerate_reason = ''
if not regenerate_reason:
LOG.info('Skipping proto regeneration: all files up to date')
return
shutil.rmtree(PYTHON_OUTPUT_PATH, ignore_errors=True)
if not os.path.exists(PYTHON_OUTPUT_PATH):
os.mkdir(PYTHON_OUTPUT_PATH)
grpcio_install_loc = ensure_grpcio_exists()
protoc_gen_mypy = _find_protoc_gen_mypy()
with PythonPath(grpcio_install_loc):
from grpc_tools import protoc
builtin_protos = pkg_resources.resource_filename('grpc_tools', '_proto')
args = (
[sys.executable] + # expecting to be called from command line
['--proto_path=%s' % builtin_protos] +
['--proto_path=%s' % d
for d in proto_dirs] + ['--python_out=%s' % PYTHON_OUTPUT_PATH] +
['--plugin=protoc-gen-mypy=%s' % protoc_gen_mypy] +
# new version of mypy-protobuf converts None to zero default value
# and remove Optional from the param type annotation. This causes
# some mypy errors. So to mitigate and fall back to old behavior,
# use `relax_strict_optional_primitives` flag. more at
# https://github.com/nipunn1313/mypy-protobuf/tree/main#relax_strict_optional_primitives # pylint:disable=line-too-long
['--mypy_out=relax_strict_optional_primitives:%s' % PYTHON_OUTPUT_PATH
] +
# TODO(robertwb): Remove the prefix once it's the default.
['--grpc_python_out=grpc_2_0:%s' % PYTHON_OUTPUT_PATH] + proto_files)
LOG.info('Regenerating Python proto definitions (%s).' % regenerate_reason)
ret_code = protoc.main(args)
if ret_code:
raise RuntimeError(
'Protoc returned non-zero status (see logs for details): '
'%s' % ret_code)
# copy resource files
for path in MODEL_RESOURCES:
shutil.copy2(os.path.join(PROJECT_ROOT, path), PYTHON_OUTPUT_PATH)
proto_packages = set()
# see: https://github.com/protocolbuffers/protobuf/issues/1491
# force relative import paths for proto files
compiled_import_re = re.compile('^from (.*) import (.*)$')
for file_path in find_by_ext(PYTHON_OUTPUT_PATH,
('_pb2.py', '_pb2_grpc.py', '_pb2.pyi')):
proto_packages.add(os.path.dirname(file_path))
lines = []
with open(file_path, encoding='utf-8') as f:
for line in f:
match_obj = compiled_import_re.match(line)
if match_obj and \
match_obj.group(1).startswith('org.apache.beam.model'):
new_import = build_relative_import(
PYTHON_OUTPUT_PATH, match_obj.group(1), file_path)
line = 'from %s import %s\n' % (new_import, match_obj.group(2))
lines.append(line)
with open(file_path, 'w') as f:
f.writelines(lines)
generate_init_files_lite(PYTHON_OUTPUT_PATH)
with PythonPath(grpcio_install_loc):
for proto_package in proto_packages:
generate_urn_files(proto_package, PYTHON_OUTPUT_PATH)
generate_init_files_full(PYTHON_OUTPUT_PATH)
if __name__ == '__main__':
generate_proto_files(force=True)