blob: b21a4f6c6b5486b198d7d12497e08ee2b9af80bc [file] [log] [blame]
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------
import json
import os
import re
import sys
import traceback
from dml_parser import FunctionParser
from typing import List, Tuple
manually_added_algorithm_builtins = ["cov", "solve"]
class PythonAPIFileGenerator(object):
target_path = os.path.join(os.path.dirname(os.path.dirname(
__file__)), 'systemds', 'operator', 'algorithm', 'builtin')
test_path = os.path.join(os.path.dirname(os.path.dirname(
__file__)), 'tests', 'auto_tests')
rst_path = os.path.join(os.path.dirname(os.path.dirname(
__file__)), 'docs', 'source', 'api', 'operator', 'algorithms')
license_path = os.path.join('resources', 'template_python_script_license')
rst_license_path = os.path.join('resources', 'template_python_script_rst_license')
template_path = os.path.join('resources', 'template_python_script_imports')
source_path: str
license: str
rst_license: str
imports: str
generated_by: str
generated_from: str
init_path = os.path.join(os.path.dirname(os.path.dirname(
__file__)), 'systemds', 'operator', 'algorithm', '__init__.py')
init_import = u"from .builtin.{function} import {function} \n"
init_all = u"__all__ = {functions} \n"
def __init__(self, source_path: str, extension: str = 'py'):
super(PythonAPIFileGenerator, self).__init__()
self.source_path = source_path
self.extension = '.{extension}'.format(extension=extension)
os.makedirs(self.__class__.target_path, exist_ok=True)
os.makedirs(self.__class__.test_path, exist_ok=True)
os.makedirs(self.__class__.rst_path, exist_ok=True)
self.function_names = list()
for name in manually_added_algorithm_builtins:
# only add files which actually exist, to avoid breaking
if self.check_manually_added_file(name + self.extension):
self.function_names.append(name)
path = os.path.dirname(__file__)
with open(os.path.join(path, self.__class__.template_path), 'r') as f:
self.imports = f.read()
with open(os.path.join(path, self.__class__.license_path), 'r') as f:
self.license = f.read()
with open(os.path.join(path, self.__class__.rst_license_path), 'r') as f:
self.rst_license = f.read()
self.generated_by = "# Autogenerated By : src/main/python/generator/generator.py\n"
self.generated_from = "# Autogenerated From : "
def check_manually_added_file(self, name: str):
path = os.path.join(self.target_path, name)
exists = os.path.isfile(path)
if not exists:
print("[ERROR] Manually added builtin algorithm not found : \'{file_name}\' \n .".format(file_name=path))
return exists
def generate_file(self, filename: str, file_content: str, dml_file: str):
"""
Generates file in self.path with name file_name
and given file_contents as content
"""
path = os.path.dirname(__file__)
target_file = os.path.join(self.target_path, filename) + self.extension
with open(target_file, "w") as new_script:
new_script.write(self.license)
new_script.write(self.generated_by)
relative_path = os.path.relpath(dml_file, start=self.source_path)
new_script.write(f"{self.generated_from}scripts/builtin/{relative_path}\n")
new_script.write(self.imports)
new_script.write(file_content)
self.function_names.append(filename)
def generate_test_file(self, function_name: str, code_block: str = None):
"""
Generates a test file for the given function
"""
target_file = os.path.join(self.test_path, f"test_{function_name}") + self.extension
with open(target_file, "w") as test_script:
test_script.write(self.license)
test_script.write(self.generated_by)
test_script.write("import unittest, contextlib, io\n\n\n")
test_script.write(f"class Test{function_name.upper()}(unittest.TestCase):\n")
test_script.write(f" def test_{function_name}(self):\n")
if code_block:
test_script.write(" # Example test case provided in python the code block\n")
test_script.write(" buf = io.StringIO()\n")
test_script.write(" with contextlib.redirect_stdout(buf):\n")
expected =""
for raw_line in code_block.splitlines(keepends=True): # keepends=True → ā€˜\n’ is preserved
stripped = raw_line.lstrip()
if stripped.startswith((">>>", "...")):
code_line = stripped[4:]
if code_line.strip():
test_script.write(f" {code_line}")
else:
test_script.write("\n")
else:
expected += raw_line
expected = expected.lstrip("\n")
test_script.write(f'\n expected = """{expected}"""\n')
test_script.write(f" self.assertEqual(buf.getvalue().strip(), expected)\n\n")
test_script.write("\nif __name__ == '__main__':\n")
test_script.write(" unittest.main()\n")
def generate_rst_file(self, function_name: str):
"""
Generates an rst file for the given function
"""
target_file = os.path.join(self.rst_path, f"{function_name}") + ".rst"
with open(target_file, "w") as rst_script:
rst_script.write(self.rst_license)
rst_script.write("\n\n")
rst_script.write(function_name + "\n")
rst_script.write("=" * len(function_name) + "\n\n")
rst_script.write(f".. autofunction:: systemds.operator.algorithm.{function_name}")
def generate_init_file(self):
with open(self.init_path, "w") as init_file:
init_file.write(self.license)
init_file.write(self.generated_by)
init_file.write("\n")
for f in self.function_names:
init_file.write(self.init_import.format(function=f))
init_file.write("\n")
init_file.write(self.init_all.format(
functions=self.function_names).replace(",", ",\n"))
class PythonAPIFunctionGenerator(object):
api_template = u"""def {function_name}({parameters}):
{header}
{params_dict}
{api_call}\n"""
kwargs_parameter_string = u"**kwargs: Dict[str, VALID_INPUT_TYPES]"
kwargs_result = u"params_dict.update(kwargs)"
type_mapping_file = os.path.join('resources', 'type_mapping.json')
type_mapping_pattern = r"^([^\[\s]+)"
path = os.path.dirname(__file__)
type_mapping_path = os.path.join(path, type_mapping_file)
with open(type_mapping_path, 'r') as mapping:
type_mapping = json.load(mapping)
def __init__(self):
super(PythonAPIFunctionGenerator, self).__init__()
def generate_function(self, data: dict) -> str:
"""
Generates function definition for PythonAPI
@param data:
{
'function_name': 'some_name',
'function_header': 'header contained in \"\"\"'
'parameters': [('param1','type','default_value'), ...],
'return_values': [('retval1', 'type'),...]
}
@return: function definition
"""
function_name = data['function_name']
parameters = self.format_param_string(
data['parameters'], len(function_name))
header = data['function_header'] if data['function_header'] else ""
params_dict = self.format_params_dict_string(data['parameters'])
api_call = self.format_api_call(
data['parameters'],
data['return_values'],
data['function_name']
)
return self.__class__.api_template.format(
function_name=function_name, parameters=parameters, header=header,
params_dict=params_dict, api_call=api_call)
def replace_types(self, item: str):
pattern = self.__class__.type_mapping_pattern
return self.__class__.type_mapping["type"].get(re.search(pattern, str(
item).lower()).group() if item else item.lower(), item)
def format_param_string(self, parameters: List[Tuple[str]], nameLength: int) -> str:
try:
result = []
has_optional = False
path = os.path.dirname(__file__)
newline_spacing = "\n" + " " * (nameLength + 5)
for param in parameters:
param[1] = self.replace_types(param[1])
if "[" in param[1] or "[" in param[0]:
raise AttributeError(
"Failed parsing param" + str(param) + "\n" + str(parameters))
if param[2] is not None:
has_optional = True
else:
result.append("{nl}{name}: {typ},".format(
result=result, name=param[0], typ=param[1],
nl=newline_spacing))
if len(result) == 0:
result = ""
else:
result[0] = result[0][len(newline_spacing):]
result[-1] = result[-1][:-1]
result = "".join(result)
if has_optional:
result = u"{result},{nl}{kwargs}".format(
result=result, kwargs=self.__class__.kwargs_parameter_string,
nl=newline_spacing)
return result
except Exception as e:
raise AttributeError("Failed Formatting parameter strings: " +
str(parameters) + " " + format_exception(e))
def format_params_dict_string(self, parameters: List[Tuple[str]]) -> str:
if not len(parameters):
return ""
has_optional = False
result = ""
for param in parameters:
if param[2] is not None:
has_optional = True
else:
if len(result):
result = u"{result}, ".format(
result=result)
else:
result = u"params_dict = {"
result = u"{result}\'{name}\': {name}".format(
result=result,
name=param[0]
)
result = u"{result}}}".format(result=result)
if has_optional:
result = u"{result}\n {kwargs}".format(
result=result,
kwargs=self.__class__.kwargs_result
)
return result
def format_api_call(self,
parameters: List[Tuple[str]],
return_values: List[Tuple[str]],
function_name: str
) -> str:
if( return_values == None):
length = 1
else:
length = len(return_values)
param_string = ""
param = parameters[0]
sds_context = "{param}.sds_context".format(param=param[0])
pattern = r"^[^\[]+"
if length > 1:
output_nodes_str, op_assignments = self.generate_output_nodes(
return_values, pattern, sds_context)
multi_return_str = self.generate_multireturn(
sds_context, function_name)
result = "\n{out_nodes}\n\n{multi_return}\n\n{op_assign}\n\n return op".format(
out_nodes=output_nodes_str,
multi_return=multi_return_str,
op_assign=op_assignments
)
return result
elif return_values == None:
result = ("return OperationNode({sds_context}," +
"\n \'{function_name}\'," +
"\n named_input_nodes=params_dict," +
"\n output_type=OutputType.NONE)").format(
sds_context=sds_context,
function_name=function_name
)
return result
else:
value = return_values[0]
output_type = re.search(pattern, value[1])
if(output_type):
output_type = output_type[0].upper()
else:
raise AttributeError("Error in pattern match: " + str(value) + "\n" +
function_name + "\n" + str(parameters) + "\n" + str(return_values))
result = ("{sds_context}," +
"\n \'{function_name}\'," +
"\n named_input_nodes=params_dict").format(
sds_context=sds_context,
function_name=function_name
)
result = "return Matrix({params})".format(params=result)
return result
def generate_output_nodes(self, return_values, pattern, sds_context):
lines = []
op_assignment = []
output_nodes = "\n output_nodes = ["
for idx, value in enumerate(return_values):
output_type = re.search(pattern, value[1])[0].upper()
output_type = output_type.lower()
if output_type == "matrix":
object_type = "Matrix"
elif output_type == "frame":
object_type = "Frame"
elif output_type == "double":
object_type = "Scalar"
elif output_type == "boolean":
object_type = "Scalar"
elif output_type == "integer":
object_type = "Scalar"
elif output_type == "list":
object_type = "List"
else:
raise ValueError("Unknown type " + object_type)
lines.append(" vX_{idx} = {obj}({sds}, '')".format(
idx=idx, obj=object_type, sds=sds_context))
output_nodes += "vX_{idx}, ".format(idx=idx)
op_assignment.append(
" vX_{idx}._unnamed_input_nodes = [op]".format(idx=idx))
output_nodes += "]"
lines = "\n".join(lines) + output_nodes
op_assignment = "\n".join(op_assignment)
return lines, op_assignment
def generate_multireturn(self, sds_context, function_name):
return (" op = MultiReturn({sds}, \'{function_name}\', output_nodes," +
" named_input_nodes=params_dict)").format(
sds=sds_context, function_name=function_name)
class PythonAPIDocumentationGenerator(object):
param_str = "\n :param {pname}: {meaning}"
def __init__(self):
super(PythonAPIDocumentationGenerator, self).__init__()
def generate_documentation(self, header_data: dict, data: dict):
"""
Generates function header for PythonAPI
@param data:
{
'function_name': 'some_name',
'parameters': [('param1','description'), ...],
'return_values': [('retval1', 'descritpion'),...]
}
@return: function header including '\"\"\"' at start and end
"""
description = header_data["description"].replace("\n", "\n ")
input_param = self.header_parameter_string(header_data["parameters"])
output_param = self.header_return_string(header_data["return_values"])
if description == "":
data['function_header'] = ""
elif header_data["return_values"] == []:
data['function_header'] = '"""\n ' + description + '"""\n'
else:
data['function_header'] = '"""\n ' + description + \
input_param + output_param + '\n """\n'
def header_parameter_string(self, parameter: dict) -> str:
parameter_str = "\n "
for param in parameter:
parameter_str += self.__class__.param_str.format(
pname=param[0], meaning=param[1])
return parameter_str
def header_return_string(self, parameter: dict) -> str:
meaning_str = ""
for param in parameter:
meaning_str += "\n :return: " + param[1]
return meaning_str
def format_exception(e):
exception_list = traceback.format_stack()
exception_list = exception_list[:-2]
exception_list.extend(traceback.format_tb(sys.exc_info()[2]))
exception_list.extend(traceback.format_exception_only(
sys.exc_info()[0], sys.exc_info()[1]))
exception_str = "Traceback (most recent call last):\n"
exception_str += "".join(exception_list)
# Removing the last \n
exception_str = exception_str[:-1]
return exception_str
if __name__ == "__main__":
if "python" in os.getcwd():
source_path = os.path.join("../../../", 'scripts', 'builtin')
else:
source_path = os.path.join(os.path.dirname(
__file__), "../../../../", 'scripts', 'builtin')
file_generator = PythonAPIFileGenerator(source_path)
fun_generator = PythonAPIFunctionGenerator()
f_parser = FunctionParser(source_path)
doc_generator = PythonAPIDocumentationGenerator()
files = f_parser.files()
for dml_file in files:
try:
header_data = f_parser.parse_header(dml_file)
data = f_parser.parse_function(dml_file)
if not data:
continue
f_parser.check_parameters(header_data, data)
doc_generator.generate_documentation(header_data, data)
if data['function_header'] == "":
print("[WARNING] in : \'{file_name}\' failed parsing docs.".format(
file_name=dml_file))
script_content = fun_generator.generate_function(data)
except Exception as e:
print("[ERROR] error in : \'{file_name}\' \n{err} \n{trace}.".format(
file_name=dml_file, err=e, trace=format_exception(e)))
continue
file_generator.generate_file(
data["function_name"], script_content, dml_file)
# Generate test cases using the code blocks
test_examples = header_data.get("code_blocks", None)
if test_examples:
for i, test_example in enumerate(test_examples):
test_example_name = data["function_name"]
# Don't add test number if only one example
if len(test_examples) > 1:
test_example_name += f"_{i}"
file_generator.generate_test_file(test_example_name, test_example)
# TODO: dml test case files should also be created
else:
print(f"[INFO] Skipping python test case creation for '{data['function_name']}': No code example.")
# Generate rst file
file_generator.generate_rst_file(data["function_name"])
file_generator.function_names.sort()
file_generator.generate_init_file()