blob: e9079b95d63554083f2faa6b4c2ce5af2815a08c [file] [log] [blame]
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------
import json
import os
import re
import textwrap
class FunctionParser(object):
header_input_pattern = r"^[ \t\n]*[#]+[ \t\n]*input[ \t\n\w:;.,#]*[\s#\-]*[#]+[\w\s\d:,.()\" \t\n\-]*[\s#\-]*$"
header_output_pattern = r"[\s#\-]*[#]+[ \t]*(return|output)[ \t\w:;.,#]*[\s#\-]*[#]+[\w\s\d:,.()\" \t\-]*[\s#\-]*$"
function_pattern = r"^[fms]_[\w]+[ \t\n]*=[ \t\n]+function[^#{]*"
# parameter_pattern = r"^m_[\w]+[\s]+=[\s]+function[\s]*\([\s]*(?=return)[\s]*\)[\s]*return[\s]*\([\s]*([\w\[\]\s,\d=.\-_]*)[\s]*\)[\s]*"
header_parameter_pattern = r"[\s#\-]*[#]+[ \t]*([\w|-]+)[\s]+([\w]+)[\s]+([\w,\d.\"\-]+)[\s]+([\w|\W]+)"
divider_pattern = r"[\s#\-]*"
type_mapping_file = os.path.join('resources', 'type_mapping.json')
def __init__(self, path: str, extension: str = 'dml'):
"""
@param path: path where to look for python scripts
"""
super(FunctionParser, self).__init__()
self.path = path
self.extension = '.{extension}'.format(extension=extension)
self.files()
def parse_function(self, path: str):
"""
@param path: path of file to parse
parses function
@return:
{
'function_name': 'some_name',
'parameters': [('param1','type','default_value'), ...],
'return_values': [('retval1', 'type'),...]
}
"""
file_name = os.path.basename(path)
function_name, extension = os.path.splitext(file_name)
try:
function_definition = self.find_function_definition(path)
except AttributeError:
print(f"[INFO] Skipping '{function_name}': does not match function name pattern. It is likely an internal function.")
return
func_split = function_definition.split("function", 1)[1].split("return")
param_str = self.extract_param_str(func_split[0])
retval_str = None
if(len(func_split)> 1):
retval_str = self.extract_param_str(func_split[1])
if param_str:
parameters = self.get_parameters(param_str)
return_values = self.get_parameters(retval_str)
data = {'function_name': function_name,
'parameters': parameters, 'return_values': return_values}
if parameters:
return data
else:
raise AttributeError("Unable to match to function definition:\n" + function_definition +
"\n parameter_str: " + param_str + "\n retVal: " + retval_str)
else:
raise AttributeError("Unable to match to function definition:\n" + function_definition +
"\n parameter_str: " + param_str + "\n retVal: " + retval_str)
def extract_param_str(self, a: str):
try:
return a[a.index("(") + 1: a.rindex(")")]
except:
raise AttributeError("failed extracting from: " + a)
def get_parameters(self, param_str: str):
if(param_str == None):
return None
params = re.split(r",[\s]*", param_str)
paramsCombined = []
inside = 0
for param in params:
before = inside
start = param.count("(")
end = param.count(")")
inside += start - end
if before > 0:
if inside > 0:
paramsCombined[-1] += param + ","
else:
paramsCombined[-1] += param + ","
else:
paramsCombined.append(param)
parameters = []
for param in paramsCombined:
parameters.append(self.parse_single_parameter(param.strip()))
return parameters
def parse_single_parameter(self, param: str):
# try:
splitted = re.split(r"[\s]+", param)
dml_type = splitted[0]
name = splitted[1]
default_value = None
if len(splitted) == 4:
if splitted[2] == "=":
default_value = splitted[3]
elif "=" in name:
default_split = name.split("=")
name = default_split[0]
default_value = default_split[1]
if default_value is None:
raise AttributeError("Failed parsing " + param)
if "(" in name or "=" in name or "]" in name or "=" in dml_type:
raise AttributeError("failed Parsing " +
param + " " + str(splitted))
return [name, dml_type, default_value]
# except Exception as e:
# import generator
# raise AttributeError("Failed parsing " + param + " " + generator.format_exception(e))
def parse_header(self, path: str):
"""
@param path: path of file to parse
parses function
@return:
{
'function_name': 'some_name',
'parameters': [('param1','description'), ...],
'return_values': [('retval1', 'description'),...]
}
"""
description = ""
h_input = ""
h_output = ""
in_input = False
in_output = False
with open(path, 'r') as f:
for _ in range(22):
line = f.readline()
while line[0] == '#':
if "# INPUT:" in line:
in_input = True
# skip two lines
line = f.readline()
line = f.readline()
elif "# OUTPUT:" in line:
in_input = False
in_output = True
# skip two lines
line = f.readline()
line = f.readline()
if in_output:
if "----------" not in line:
h_output += line[1:]
elif in_input:
if "----------" not in line:
h_input += line[1:]
else:
description += line[1:]
line = f.readline()
if description == "" or h_input == "" or h_output == "":
file_name = os.path.basename(path)
print("[WARNING] Could not parse header in file \'{file_name}\'.".format(
file_name=file_name))
input_parameters = []
output_parameters = []
else:
input_parameters = self.parse_input_output_string(h_input)
output_parameters = self.parse_input_output_string(h_output)
with open(path, 'r') as f:
content = f.read()
pat = re.compile(
r"""
^\#\s*\.\.\s*code-block::\s*python # .. code-block:: python
(.*?) # ← capture the actual example
(?= # stop just before
^\#\s*(?:INPUT:| # → “# INPUT:” OR
\.\.\s*code-block::\s*python) # → another “# .. code-block:: python”
)
""",
re.MULTILINE | re.DOTALL | re.VERBOSE,
)
code_blocks = []
for match in pat.finditer(content):
raw_block = match.group(1)
code_lines = [line.lstrip("#") for line in raw_block.splitlines()] # Remove leading #
code_block = textwrap.dedent("\n".join([code_line for code_line in code_lines if code_line != ""]))
code_blocks.append(code_block)
data = {'description': description,
'parameters': input_parameters,
'return_values': output_parameters,
'code_blocks': code_blocks}
return data
def parse_input_output_string(self, data: str):
"""
parse the data into a list of tuples containing
a parameter and a description
"""
ret = []
for line in data.split("\n"):
if line:
if line[1] == " ":
prev = ret[-1]
n = (prev[0], prev[1] +"\n " + line.strip())
ret[-1] = n
# ret[-1][1] += line.strip()
else:
vd = line.split(" ", 1)
ret.append((vd[0].strip(),vd[1].strip()))
return ret
def find_function_definition(self, path: str):
with open(path, 'r') as f:
content = f.read()
match = re.search(pattern=self.__class__.function_pattern,
string=content, flags=re.I | re.M)
if match:
start = match.start()
end = match.end()
return content[start:end]
else:
raise AttributeError("Function definition not found in : " + path)
def files(self):
"""
generator function to find files in self.path, that end with self.extension
"""
files = os.listdir(self.path)
files.sort()
for f in files:
name, extension = os.path.splitext(f)
if extension == self.extension:
yield os.path.join(self.path, f)
def check_parameters(self, header, data):
type_mapping_pattern = r"^([^\[\s]+)"
path = os.path.dirname(__file__)
type_mapping_path = os.path.join(
path, self.__class__.type_mapping_file)
with open(type_mapping_path, 'r') as mapping:
type_mapping = json.load(mapping)
header_param_names = [p[0].lower() for p in header["parameters"]]
data_param_names = [p[0].lower() for p in data["parameters"]]
# if header_param_names != data_param_names:
# print("[WARNING] The parameter names of the function does not match with the documentation "
# "for file \'{file_name}\'.".format(file_name=data["function_name"]))
header_param_type = [p[1].lower() for p in header["parameters"]]
header_param_type = [type_mapping["type"].get(
item, item) for item in header_param_type]
data_param_type = [p[1].lower() for p in data["parameters"]]
data_param_type = [type_mapping["type"].get(
re.search(type_mapping_pattern, str(item).lower()).group() if item else str(item).lower(), item)
for item in data_param_type]
# if header_param_type != data_param_type:
# print("[WARNING] The parameter type of the function does not match with the documentation "
# "for file \'{file_name}\'.".format(file_name=data["function_name"]))