| # ------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # ------------------------------------------------------------- |
| |
| import json |
| import os |
| import re |
| import sys |
| import traceback |
| from dml_parser import FunctionParser |
| from typing import List, Tuple |
| |
| |
| manually_added_algorithm_builtins = ["cov", "solve"] |
| |
| |
| class PythonAPIFileGenerator(object): |
| |
| target_path = os.path.join(os.path.dirname(os.path.dirname( |
| __file__)), 'systemds', 'operator', 'algorithm', 'builtin') |
| test_path = os.path.join(os.path.dirname(os.path.dirname( |
| __file__)), 'tests', 'auto_tests') |
| rst_path = os.path.join(os.path.dirname(os.path.dirname( |
| __file__)), 'docs', 'source', 'api', 'operator', 'algorithms') |
| license_path = os.path.join('resources', 'template_python_script_license') |
| rst_license_path = os.path.join('resources', 'template_python_script_rst_license') |
| template_path = os.path.join('resources', 'template_python_script_imports') |
| |
| source_path: str |
| license: str |
| rst_license: str |
| imports: str |
| generated_by: str |
| generated_from: str |
| |
| init_path = os.path.join(os.path.dirname(os.path.dirname( |
| __file__)), 'systemds', 'operator', 'algorithm', '__init__.py') |
| init_import = u"from .builtin.{function} import {function} \n" |
| init_all = u"__all__ = {functions} \n" |
| |
| def __init__(self, source_path: str, extension: str = 'py'): |
| super(PythonAPIFileGenerator, self).__init__() |
| self.source_path = source_path |
| |
| self.extension = '.{extension}'.format(extension=extension) |
| os.makedirs(self.__class__.target_path, exist_ok=True) |
| os.makedirs(self.__class__.test_path, exist_ok=True) |
| os.makedirs(self.__class__.rst_path, exist_ok=True) |
| self.function_names = list() |
| for name in manually_added_algorithm_builtins: |
| # only add files which actually exist, to avoid breaking |
| if self.check_manually_added_file(name + self.extension): |
| self.function_names.append(name) |
| |
| path = os.path.dirname(__file__) |
| |
| with open(os.path.join(path, self.__class__.template_path), 'r') as f: |
| self.imports = f.read() |
| with open(os.path.join(path, self.__class__.license_path), 'r') as f: |
| self.license = f.read() |
| with open(os.path.join(path, self.__class__.rst_license_path), 'r') as f: |
| self.rst_license = f.read() |
| |
| self.generated_by = "# Autogenerated By : src/main/python/generator/generator.py\n" |
| self.generated_from = "# Autogenerated From : " |
| |
| def check_manually_added_file(self, name: str): |
| path = os.path.join(self.target_path, name) |
| exists = os.path.isfile(path) |
| if not exists: |
| print("[ERROR] Manually added builtin algorithm not found : \'{file_name}\' \n .".format(file_name=path)) |
| return exists |
| |
| def generate_file(self, filename: str, file_content: str, dml_file: str): |
| """ |
| Generates file in self.path with name file_name |
| and given file_contents as content |
| """ |
| path = os.path.dirname(__file__) |
| |
| target_file = os.path.join(self.target_path, filename) + self.extension |
| with open(target_file, "w") as new_script: |
| new_script.write(self.license) |
| new_script.write(self.generated_by) |
| relative_path = os.path.relpath(dml_file, start=self.source_path) |
| new_script.write(f"{self.generated_from}scripts/builtin/{relative_path}\n") |
| new_script.write(self.imports) |
| new_script.write(file_content) |
| |
| self.function_names.append(filename) |
| |
| def generate_test_file(self, function_name: str, code_block: str = None): |
| """ |
| Generates a test file for the given function |
| """ |
| target_file = os.path.join(self.test_path, f"test_{function_name}") + self.extension |
| with open(target_file, "w") as test_script: |
| test_script.write(self.license) |
| test_script.write(self.generated_by) |
| test_script.write("import unittest, contextlib, io\n\n\n") |
| |
| test_script.write(f"class Test{function_name.upper()}(unittest.TestCase):\n") |
| test_script.write(f" def test_{function_name}(self):\n") |
| if code_block: |
| test_script.write(" # Example test case provided in python the code block\n") |
| test_script.write(" buf = io.StringIO()\n") |
| test_script.write(" with contextlib.redirect_stdout(buf):\n") |
| |
| expected ="" |
| for raw_line in code_block.splitlines(keepends=True): # keepends=True ā ā\nā is preserved |
| stripped = raw_line.lstrip() |
| if stripped.startswith((">>>", "...")): |
| code_line = stripped[4:] |
| if code_line.strip(): |
| test_script.write(f" {code_line}") |
| else: |
| test_script.write("\n") |
| else: |
| expected += raw_line |
| expected = expected.lstrip("\n") |
| test_script.write(f'\n expected = """{expected}"""\n') |
| test_script.write(f" self.assertEqual(buf.getvalue().strip(), expected)\n\n") |
| |
| test_script.write("\nif __name__ == '__main__':\n") |
| test_script.write(" unittest.main()\n") |
| |
| def generate_rst_file(self, function_name: str): |
| """ |
| Generates an rst file for the given function |
| """ |
| target_file = os.path.join(self.rst_path, f"{function_name}") + ".rst" |
| with open(target_file, "w") as rst_script: |
| rst_script.write(self.rst_license) |
| rst_script.write("\n\n") |
| rst_script.write(function_name + "\n") |
| rst_script.write("=" * len(function_name) + "\n\n") |
| rst_script.write(f".. autofunction:: systemds.operator.algorithm.{function_name}") |
| |
| def generate_init_file(self): |
| with open(self.init_path, "w") as init_file: |
| init_file.write(self.license) |
| init_file.write(self.generated_by) |
| init_file.write("\n") |
| for f in self.function_names: |
| init_file.write(self.init_import.format(function=f)) |
| init_file.write("\n") |
| init_file.write(self.init_all.format( |
| functions=self.function_names).replace(",", ",\n")) |
| |
| |
| class PythonAPIFunctionGenerator(object): |
| |
| api_template = u"""def {function_name}({parameters}): |
| {header} |
| {params_dict} |
| {api_call}\n""" |
| |
| kwargs_parameter_string = u"**kwargs: Dict[str, VALID_INPUT_TYPES]" |
| kwargs_result = u"params_dict.update(kwargs)" |
| |
| type_mapping_file = os.path.join('resources', 'type_mapping.json') |
| |
| type_mapping_pattern = r"^([^\[\s]+)" |
| |
| path = os.path.dirname(__file__) |
| type_mapping_path = os.path.join(path, type_mapping_file) |
| |
| with open(type_mapping_path, 'r') as mapping: |
| type_mapping = json.load(mapping) |
| |
| def __init__(self): |
| super(PythonAPIFunctionGenerator, self).__init__() |
| |
| def generate_function(self, data: dict) -> str: |
| """ |
| Generates function definition for PythonAPI |
| @param data: |
| { |
| 'function_name': 'some_name', |
| 'function_header': 'header contained in \"\"\"' |
| 'parameters': [('param1','type','default_value'), ...], |
| 'return_values': [('retval1', 'type'),...] |
| } |
| @return: function definition |
| """ |
| function_name = data['function_name'] |
| parameters = self.format_param_string( |
| data['parameters'], len(function_name)) |
| header = data['function_header'] if data['function_header'] else "" |
| params_dict = self.format_params_dict_string(data['parameters']) |
| api_call = self.format_api_call( |
| data['parameters'], |
| data['return_values'], |
| data['function_name'] |
| ) |
| return self.__class__.api_template.format( |
| function_name=function_name, parameters=parameters, header=header, |
| params_dict=params_dict, api_call=api_call) |
| |
| def replace_types(self, item: str): |
| pattern = self.__class__.type_mapping_pattern |
| return self.__class__.type_mapping["type"].get(re.search(pattern, str( |
| item).lower()).group() if item else item.lower(), item) |
| |
| def format_param_string(self, parameters: List[Tuple[str]], nameLength: int) -> str: |
| try: |
| result = [] |
| has_optional = False |
| path = os.path.dirname(__file__) |
| newline_spacing = "\n" + " " * (nameLength + 5) |
| |
| for param in parameters: |
| |
| param[1] = self.replace_types(param[1]) |
| |
| if "[" in param[1] or "[" in param[0]: |
| raise AttributeError( |
| "Failed parsing param" + str(param) + "\n" + str(parameters)) |
| if param[2] is not None: |
| has_optional = True |
| |
| else: |
| result.append("{nl}{name}: {typ},".format( |
| result=result, name=param[0], typ=param[1], |
| nl=newline_spacing)) |
| if len(result) == 0: |
| result = "" |
| else: |
| result[0] = result[0][len(newline_spacing):] |
| result[-1] = result[-1][:-1] |
| result = "".join(result) |
| if has_optional: |
| result = u"{result},{nl}{kwargs}".format( |
| result=result, kwargs=self.__class__.kwargs_parameter_string, |
| nl=newline_spacing) |
| |
| return result |
| except Exception as e: |
| raise AttributeError("Failed Formatting parameter strings: " + |
| str(parameters) + " " + format_exception(e)) |
| |
| def format_params_dict_string(self, parameters: List[Tuple[str]]) -> str: |
| if not len(parameters): |
| return "" |
| has_optional = False |
| result = "" |
| for param in parameters: |
| if param[2] is not None: |
| has_optional = True |
| else: |
| if len(result): |
| result = u"{result}, ".format( |
| result=result) |
| else: |
| result = u"params_dict = {" |
| result = u"{result}\'{name}\': {name}".format( |
| result=result, |
| name=param[0] |
| ) |
| result = u"{result}}}".format(result=result) |
| if has_optional: |
| result = u"{result}\n {kwargs}".format( |
| result=result, |
| kwargs=self.__class__.kwargs_result |
| ) |
| return result |
| |
| def format_api_call(self, |
| parameters: List[Tuple[str]], |
| return_values: List[Tuple[str]], |
| function_name: str |
| ) -> str: |
| if( return_values == None): |
| length = 1 |
| else: |
| length = len(return_values) |
| param_string = "" |
| param = parameters[0] |
| sds_context = "{param}.sds_context".format(param=param[0]) |
| pattern = r"^[^\[]+" |
| if length > 1: |
| output_nodes_str, op_assignments = self.generate_output_nodes( |
| return_values, pattern, sds_context) |
| multi_return_str = self.generate_multireturn( |
| sds_context, function_name) |
| result = "\n{out_nodes}\n\n{multi_return}\n\n{op_assign}\n\n return op".format( |
| out_nodes=output_nodes_str, |
| multi_return=multi_return_str, |
| op_assign=op_assignments |
| ) |
| return result |
| elif return_values == None: |
| result = ("return OperationNode({sds_context}," + |
| "\n \'{function_name}\'," + |
| "\n named_input_nodes=params_dict," + |
| "\n output_type=OutputType.NONE)").format( |
| sds_context=sds_context, |
| function_name=function_name |
| ) |
| return result |
| else: |
| value = return_values[0] |
| output_type = re.search(pattern, value[1]) |
| if(output_type): |
| output_type = output_type[0].upper() |
| else: |
| raise AttributeError("Error in pattern match: " + str(value) + "\n" + |
| function_name + "\n" + str(parameters) + "\n" + str(return_values)) |
| result = ("{sds_context}," + |
| "\n \'{function_name}\'," + |
| "\n named_input_nodes=params_dict").format( |
| sds_context=sds_context, |
| function_name=function_name |
| ) |
| result = "return Matrix({params})".format(params=result) |
| return result |
| |
| def generate_output_nodes(self, return_values, pattern, sds_context): |
| lines = [] |
| op_assignment = [] |
| output_nodes = "\n output_nodes = [" |
| for idx, value in enumerate(return_values): |
| output_type = re.search(pattern, value[1])[0].upper() |
| |
| output_type = output_type.lower() |
| |
| if output_type == "matrix": |
| object_type = "Matrix" |
| elif output_type == "frame": |
| object_type = "Frame" |
| elif output_type == "double": |
| object_type = "Scalar" |
| elif output_type == "boolean": |
| object_type = "Scalar" |
| elif output_type == "integer": |
| object_type = "Scalar" |
| elif output_type == "list": |
| object_type = "List" |
| else: |
| raise ValueError("Unknown type " + object_type) |
| |
| lines.append(" vX_{idx} = {obj}({sds}, '')".format( |
| idx=idx, obj=object_type, sds=sds_context)) |
| output_nodes += "vX_{idx}, ".format(idx=idx) |
| op_assignment.append( |
| " vX_{idx}._unnamed_input_nodes = [op]".format(idx=idx)) |
| output_nodes += "]" |
| lines = "\n".join(lines) + output_nodes |
| op_assignment = "\n".join(op_assignment) |
| return lines, op_assignment |
| |
| def generate_multireturn(self, sds_context, function_name): |
| return (" op = MultiReturn({sds}, \'{function_name}\', output_nodes," + |
| " named_input_nodes=params_dict)").format( |
| sds=sds_context, function_name=function_name) |
| |
| |
| class PythonAPIDocumentationGenerator(object): |
| |
| param_str = "\n :param {pname}: {meaning}" |
| |
| def __init__(self): |
| super(PythonAPIDocumentationGenerator, self).__init__() |
| |
| def generate_documentation(self, header_data: dict, data: dict): |
| """ |
| Generates function header for PythonAPI |
| @param data: |
| { |
| 'function_name': 'some_name', |
| 'parameters': [('param1','description'), ...], |
| 'return_values': [('retval1', 'descritpion'),...] |
| } |
| @return: function header including '\"\"\"' at start and end |
| """ |
| description = header_data["description"].replace("\n", "\n ") |
| input_param = self.header_parameter_string(header_data["parameters"]) |
| output_param = self.header_return_string(header_data["return_values"]) |
| |
| if description == "": |
| data['function_header'] = "" |
| elif header_data["return_values"] == []: |
| data['function_header'] = '"""\n ' + description + '"""\n' |
| else: |
| data['function_header'] = '"""\n ' + description + \ |
| input_param + output_param + '\n """\n' |
| |
| def header_parameter_string(self, parameter: dict) -> str: |
| parameter_str = "\n " |
| for param in parameter: |
| parameter_str += self.__class__.param_str.format( |
| pname=param[0], meaning=param[1]) |
| |
| return parameter_str |
| |
| def header_return_string(self, parameter: dict) -> str: |
| meaning_str = "" |
| for param in parameter: |
| meaning_str += "\n :return: " + param[1] |
| return meaning_str |
| |
| |
| def format_exception(e): |
| exception_list = traceback.format_stack() |
| exception_list = exception_list[:-2] |
| exception_list.extend(traceback.format_tb(sys.exc_info()[2])) |
| exception_list.extend(traceback.format_exception_only( |
| sys.exc_info()[0], sys.exc_info()[1])) |
| |
| exception_str = "Traceback (most recent call last):\n" |
| exception_str += "".join(exception_list) |
| # Removing the last \n |
| exception_str = exception_str[:-1] |
| |
| return exception_str |
| |
| |
| if __name__ == "__main__": |
| if "python" in os.getcwd(): |
| source_path = os.path.join("../../../", 'scripts', 'builtin') |
| else: |
| source_path = os.path.join(os.path.dirname( |
| __file__), "../../../../", 'scripts', 'builtin') |
| file_generator = PythonAPIFileGenerator(source_path) |
| fun_generator = PythonAPIFunctionGenerator() |
| f_parser = FunctionParser(source_path) |
| doc_generator = PythonAPIDocumentationGenerator() |
| files = f_parser.files() |
| for dml_file in files: |
| try: |
| header_data = f_parser.parse_header(dml_file) |
| data = f_parser.parse_function(dml_file) |
| if not data: |
| continue |
| f_parser.check_parameters(header_data, data) |
| doc_generator.generate_documentation(header_data, data) |
| |
| if data['function_header'] == "": |
| print("[WARNING] in : \'{file_name}\' failed parsing docs.".format( |
| file_name=dml_file)) |
| |
| script_content = fun_generator.generate_function(data) |
| except Exception as e: |
| print("[ERROR] error in : \'{file_name}\' \n{err} \n{trace}.".format( |
| file_name=dml_file, err=e, trace=format_exception(e))) |
| continue |
| file_generator.generate_file( |
| data["function_name"], script_content, dml_file) |
| |
| # Generate test cases using the code blocks |
| test_examples = header_data.get("code_blocks", None) |
| if test_examples: |
| for i, test_example in enumerate(test_examples): |
| test_example_name = data["function_name"] |
| # Don't add test number if only one example |
| if len(test_examples) > 1: |
| test_example_name += f"_{i}" |
| file_generator.generate_test_file(test_example_name, test_example) |
| # TODO: dml test case files should also be created |
| else: |
| print(f"[INFO] Skipping python test case creation for '{data['function_name']}': No code example.") |
| |
| # Generate rst file |
| file_generator.generate_rst_file(data["function_name"]) |
| |
| file_generator.function_names.sort() |
| file_generator.generate_init_file() |