blob: 5cdb43cbc47d6430fee3a7264d628506bb575cd2 [file] [log] [blame]
import plpy
from binding import *
import binding
from pyxb.utils.domutils import BindingDOMSupport as bds
from formula import Formula
from glm.glm import __extract_family_params as extract_family_params
from datetime import datetime
from time import tzname, daylight
from collections import defaultdict
from utilities.utilities import current_user
from utilities.utilities import madlib_version
from utilities.utilities import add_postfix
from utilities.validate_args import cols_in_tbl_valid
class PMMLBuilder(object):
"""This generic class for PMML export for any model with grouping support.
The _construct_predict_spec method will set up PMML model attribute based
on expected prediction type: regression or classification. Current
implementation uses default prediction type for each model. In the future,
the function may support user specification.
"""
def __init__(self, schema_madlib, model_type, model_table, name_spec):
self.schema_madlib = schema_madlib
self.model_table = model_table
self.model_type = model_type
self.name_spec = name_spec
self.pmml_str = None
def _validate_output_table(self):
cols_in_tbl_valid(self.model_table,
self.__class__.OUTPUT_COLS,
'PMML')
def _validate_summary_table(self):
cols_in_tbl_valid(add_postfix(self.model_table, '_summary'),
self.__class__.SUMMARY_COLS,
'PMML')
def _query_summary_table(self):
summary_str = ', '.join(self.__class__.SUMMARY_COLS)
summary_query = """
SELECT {summary_str}
FROM {table}
""".format(summary_str=summary_str,
table=add_postfix(self.model_table, '_summary'))
self.summary = plpy.execute(summary_query)[0]
def _query_output_table(self):
raise NotImplementedError
def _parse_summary(self):
raise NotImplementedError
def _parse_output(self):
raise NotImplementedError
def _construct_predict_spec(self):
raise NotImplementedError
def _construct_formula(self):
self.formula = Formula(self.y_str, self.x_str, self.n_coef)
if self.name_spec is not None:
self.formula.rename(self.name_spec)
else:
# change the y column name to avoid possible
# name conflicts
if all(s.lower() in '0123456789_abcdefghijklmnopqrstuvwxyz' for s in self.formula.y):
self.formula.y += '_pmml_prediction'
else:
self.formula.y = '(' + self.formula.y + ')_pmml_prediction'
def _build_header(self):
user = current_user()
extension = Extension(name='user', value_=user, extender='MADlib')
application = Application(name='MADlib',
version=madlib_version(self.schema_madlib))
now = datetime.now()
copyright = "Copyright (c) {year} {user}".format(year=now.year, user=user)
timestamp = Timestamp(str(now) + " " + tzname[daylight])
self.header = Header(extension, application, timestamp, copyright=copyright)
def _build_data_dictionary(self):
data_field_forest = [DataField(*self.y_value_forest,
name=self.formula.y,
optype=self.y_optype,
dataType=self.y_data_type)]
data_field_forest.extend([DataField(name=x_i,
optype='continuous',
dataType='double')
for x_i in self.formula.x])
data_field_forest.extend([DataField(name=k,
optype='categorical',
dataType='string')
for k in self.grouping_keys])
self.data_dictionary = DataDictionary(*data_field_forest,
numberOfFields=len(data_field_forest))
def _build_model(self):
if self.grouping_keys == []: # no grouping
self.model = self._create_single_model(self.coef0)
else: # grouping
# MiningSchema
mining_field_forest = \
[MiningField(name=self.formula.y, usageType='predicted')]
mining_field_forest.extend(
[MiningField(name=x_i) for x_i in self.formula.x])
mining_field_forest.extend(
[MiningField(name=k) for k in self.grouping_keys])
grouping_mining_schema = MiningSchema(*mining_field_forest)
# Segmentation
segment_forest = []
for g in self.grouped_coefs: # one segment for each row in output table
if len(g['coef'])==0:
continue
predicate_forest = [SimplePredicate(field=k,
value_=g[k],
operator='equal')
for k in self.grouping_keys]
if len(predicate_forest) == 1:
predicate = predicate_forest[0]
else:
predicate = CompoundPredicate(*predicate_forest,
booleanOperator='and')
single_model = self._create_single_model(g['coef'])
segment_forest.append(Segment(predicate, single_model))
segmentation = Segmentation(*segment_forest,
multipleModelMethod='selectFirst')
# MiningModel
self.model = MiningModel(grouping_mining_schema,
segmentation,
functionName=self.function)
def _format(self, pmml):
"""Returns a pretty-printed XML string for the given PMML.
"""
bds.SetDefaultNamespace(binding.Namespace) # avoid ns:xxx prefix
declaration = "<?xml version=\"1.0\" standalone=\"yes\"?>\n"
xml = pmml.toDOM().toprettyxml(indent=" ")
self.pmml_str = declaration + "\n".join(xml.split("\n")[1:])
def query(self):
# summary table
self._validate_summary_table()
self._query_summary_table()
self._parse_summary()
# output table
self._validate_output_table()
self._query_output_table()
self._parse_output()
def build(self):
self._construct_predict_spec()
self._construct_formula()
self._build_header()
self._build_data_dictionary()
self._build_model()
pmml = PMML(self.header, self.data_dictionary, self.model, version='4.1')
self._format(pmml)
class RegressionPMMLBuilder(PMMLBuilder):
"""Builder class for PMML model 'RegressionModel'.
Current implementation supports linear regression and logistic regression model.
"""
PREDICT_SPEC_DICT = {
'logregr': {'function_name': 'classification',
'y_optype': 'categorical',
'y_data_type': 'boolean'},
'linregr': {'function_name': 'regression',
'y_optype': 'continuous',
'y_data_type': 'double'}
}
OUTPUT_COLS = ['coef']
SUMMARY_COLS = ['grouping_col', 'dependent_varname', 'independent_varname']
def __init__(self, schema_madlib, model_type, model_table, name_spec):
PMMLBuilder.__init__(
self, schema_madlib, model_type, model_table, name_spec)
self.mining_schema = None
def _parse_summary(self):
self.y_str = self.summary['dependent_varname']
self.x_str = self.summary['independent_varname']
self.grouping_col = self.summary['grouping_col']
self.grouping_str = ('' if self.grouping_col is None
else self.grouping_col + ',')
def _query_output_table(self):
output_query = """
SELECT {grouping_str} coef
FROM {model_table}
""".format(**self.__dict__)
self.output = plpy.execute(output_query)
def _parse_output(self):
self.grouped_coefs = self.output
self.coef0 = self.output[0]['coef']
self.n_coef = len(self.coef0)
self.grouping_keys = [k for k in self.output[0] if k != 'coef']
def _construct_predict_spec(self):
self.predict_spec = self.__class__.PREDICT_SPEC_DICT[self.model_type]
self.function = self.predict_spec['function_name']
self.y_value_forest = []
if self.function == 'classification':
self.y_value_forest.extend([Value(value_='True'), Value(value_='False')])
self.y_optype = self.predict_spec['y_optype']
self.y_data_type = self.predict_spec['y_data_type']
def _build_mining_schema(self):
if self.mining_schema is None:
mining_field_forest = [MiningField(name=self.formula.y,
usageType='predicted')]
for i, x_i in enumerate(self.formula.x):
mining_field_forest.append(MiningField(name=x_i))
self.mining_schema = MiningSchema(*mining_field_forest)
def _create_numeric_predictors(self, coef):
numeric_predictor_forest = []
for i, e in enumerate(coef):
numeric_predictor_forest.append(
NumericPredictor(name=self.formula.x[i], coefficient=e))
return numeric_predictor_forest
def _create_model_regression(self, numeric_predictor_forest):
regression_table_forest = [RegressionTable(*numeric_predictor_forest,
intercept='0')]
return RegressionModel(self.mining_schema,
*regression_table_forest,
functionName=self.function)
def _create_model_classification(self, numeric_predictor_forest):
regression_table_forest = [
RegressionTable(*numeric_predictor_forest,
targetCategory=True, intercept='0'),
RegressionTable(targetCategory=False, intercept='0')]
return RegressionModel(self.mining_schema,
*regression_table_forest,
functionName=self.function,
normalizationMethod='softmax')
def _create_single_model(self, coef):
self._build_mining_schema()
numeric_predictor_forest = self._create_numeric_predictors(coef)
if self.function == 'regression':
return self._create_model_regression(numeric_predictor_forest)
elif self.function == 'classification':
return self._create_model_classification(numeric_predictor_forest)
class GeneralRegressionPMMLBuilder(RegressionPMMLBuilder):
"""Builder class for PMML model 'GeneralRegressionModel'.
"""
def __init__(self, schema_madlib, model_type, model_table, name_spec):
RegressionPMMLBuilder.__init__(
self, schema_madlib, model_type, model_table, name_spec)
self.parameter_list = None
self.covariate_list = None
self.ppmatrix = None
def _build_parameter_list(self):
if self.parameter_list is None:
parameter_forest = [Parameter(name='p0', label='Intercept')]
for i, x_i in enumerate(self.formula.x):
parameter_forest.append(Parameter(name="p"+str(i+1), label=x_i))
self.parameter_list = ParameterList(*parameter_forest)
def _build_covariate_list(self):
if self.covariate_list is None:
predictor_forest = []
for i, x_i in enumerate(self.formula.x):
predictor_forest.append(Predictor(name=x_i))
self.covariate_list = CovariateList(*predictor_forest)
def _build_ppmatrix(self):
if self.ppmatrix is None:
ppcell_forest = []
for i, x_i in enumerate(self.formula.x):
ppcell_forest.append(PPCell(value_="1",
predictorName=x_i,
parameterName="p"+str(i+1)))
self.ppmatrix = PPMatrix(*ppcell_forest)
class GLMPMMLBuilder(GeneralRegressionPMMLBuilder):
"""Builder class for PMML modelType='generalizedLinear'.
Current implementaion supports GLM Gaussian, Binomial, Gamma, Poisson
Inverse Gaussian.
"""
PREDICT_SPEC_DICT = {
'gaussian': {'function_name': 'regression',
'y_optype': 'continuous',
'y_data_type': 'double',
'distribution': 'normal'},
'binomial': {'function_name': 'classification',
'y_optype': 'categorical',
'y_data_type': 'boolean',
'distribution': 'binomial'},
'gamma': {'function_name': 'regression',
'y_optype': 'continuous',
'y_data_type': 'double',
'distribution': 'gamma'},
'poisson': {'function_name': 'regression',
'y_optype': 'continuous',
'y_data_type': 'integer',
'distribution': 'poisson'},
'inverse_gaussian': {'function_name': 'regression',
'y_optype': 'continuous',
'y_data_type': 'double',
'distribution': 'igauss'}
}
LINK_SPEC_DICT = {
'inverse': {'linkFunction': 'power',
'linkParameter': '-1'},
'log': {'linkFunction': 'log'},
'identity': {'linkFunction': 'identity'},
'probit': {'linkFunction': 'probit'},
'logit': {'linkFunction': 'logit'},
'sqrt': {'linkFunction': 'power',
'linkParameter': '0.5'},
'sqr_inverse': {'linkFunction': 'power',
'linkParameter': '-2'}
}
SUMMARY_COLS = ['grouping_col',
'dependent_varname',
'independent_varname',
'family_params'
]
def __init__(self, schema_madlib, model_type, model_table, name_spec):
GeneralRegressionPMMLBuilder.__init__(
self, schema_madlib, model_type, model_table, name_spec)
def _parse_summary(self):
self.y_str = self.summary['dependent_varname']
self.x_str = self.summary['independent_varname']
self.grouping_col = self.summary['grouping_col']
self.grouping_str = ('' if self.grouping_col is None
else self.grouping_col + ',')
self.family_params_dict = extract_family_params(
self.schema_madlib, self.summary['family_params'])
self.family = self.family_params_dict['family']
self.link = self.family_params_dict['link']
self.link_spec = self.__class__.LINK_SPEC_DICT[self.link]
def _construct_predict_spec(self):
self.predict_spec = self.__class__.PREDICT_SPEC_DICT[self.family]
self.function = self.predict_spec['function_name']
self.y_value_forest = []
if self.function == 'classification' and self.family == 'binomial':
self.y_value_forest.extend([Value(value_='True'), Value(value_='False')])
self.y_optype = self.predict_spec['y_optype']
self.distribution = self.predict_spec['distribution']
self.y_data_type = self.predict_spec['y_data_type']
def _create_single_model(self, coef):
self._build_mining_schema()
self._build_parameter_list()
self._build_covariate_list()
self._build_ppmatrix()
# pcells
pcell_attrib0 = dict(parameterName='p0', beta='0', df='1')
if self.function == 'classification':
pcell_attrib0['targetCategory'] = True
pcell_forest = [PCell(**pcell_attrib0)]
for i, e in enumerate(coef):
pcell_attrib = dict(parameterName="p"+str(i+1), beta=e, df='1')
if self.function == 'classification':
pcell_attrib['targetCategory'] = True
pcell_forest.append(PCell(**pcell_attrib))
return GeneralRegressionModel(self.mining_schema,
self.parameter_list,
FactorList(),
self.covariate_list,
self.ppmatrix,
ParamMatrix(*pcell_forest),
targetVariableName=self.formula.y,
modelType='generalizedLinear',
distribution=self.distribution,
functionName=self.function,
**self.link_spec)
class MultiClassRegressionPMMLBuilder(GeneralRegressionPMMLBuilder):
"""Base builder class for Multinomial logistic and Ordinal.
"""
def __init__(self, schema_madlib, model_type, model_table, name_spec):
GeneralRegressionPMMLBuilder.__init__(
self, schema_madlib, model_type, model_table, name_spec)
def _construct_predict_spec(self):
self.function = 'classification'
self.y_value_forest = [Value(value_=x_i) for x_i in self.cate_list]
self.y_optype = 'categorical'
self.y_data_type = 'string'
class OrdinalRegressionPMMLBuilder(MultiClassRegressionPMMLBuilder):
"""Builder class for PMML modelType='ordinalMultinomial'.
The signs of feature coefficients are different between PMML and madlib.ordinal()
output.
"""
OUTPUT_COLS = ['coef_threshold', 'coef_feature']
SUMMARY_COLS = ['grouping_col',
'dependent_varname',
'independent_varname',
'link_func',
'category_list']
def __init__(self, schema_madlib, model_type, model_table, name_spec):
MultiClassRegressionPMMLBuilder.__init__(
self, schema_madlib, model_type, model_table, name_spec)
def _parse_summary(self):
self.y_str = self.summary['dependent_varname']
self.x_str = self.summary['independent_varname']
self.grouping_col = self.summary['grouping_col']
self.grouping_str = ('' if self.grouping_col is None
else self.grouping_col + ',')
self.link = self.summary['link_func']
self.cate_list = self.summary['category_list'].split(',')
def _query_output_table(self):
self.output_query = """
SELECT
{grouping_str}
coef_feature || coef_threshold as coef,
coef_feature
FROM {model_table}
""".format(**self.__dict__)
self.output = plpy.execute(self.output_query)
def _parse_output(self):
self.grouped_coefs = self.output
self.coef0 = self.output[0]['coef']
self.n_coef = len(self.output[0]['coef_feature'])
self.grouping_keys = [k for k in self.output[0]
if k not in ('coef', 'coef_feature')]
def _create_single_model(self, coef):
coef_threshold = coef[self.formula.n_coef:]
coef_feature = coef[:self.formula.n_coef]
self._build_mining_schema()
self._build_parameter_list()
self._build_covariate_list()
self._build_ppmatrix()
# pcells
pcell_forest = []
for i, e in enumerate(coef_threshold):
pcell_forest.append(PCell(parameterName='p0',
beta=e,
df='1',
targetCategory=self.cate_list[i]))
for i, e in enumerate(coef_feature):
# -e is used
pcell_forest.append(PCell(parameterName="p"+str(i+1), beta=-e, df='1'))
return GeneralRegressionModel(self.mining_schema,
self.parameter_list,
FactorList(),
self.covariate_list,
self.ppmatrix,
ParamMatrix(*pcell_forest),
targetVariableName=self.formula.y,
modelType='ordinalMultinomial',
functionName=self.function,
cumulativeLink=self.link)
class MultinomRegressionPMMLBuilder(MultiClassRegressionPMMLBuilder):
"""Builder class for PMML modelType='multinomialLogistic'.
"""
OUTPUT_COLS = ['category', 'coef']
SUMMARY_COLS = ['grouping_col',
'dependent_varname',
'independent_varname',
'category_list',
'ref_category']
def __init__(self, schema_madlib, model_type, model_table, name_spec):
MultiClassRegressionPMMLBuilder.__init__(
self, schema_madlib, model_type, model_table, name_spec)
def _parse_summary(self):
self.y_str = self.summary['dependent_varname']
self.x_str = self.summary['independent_varname']
self.grouping_col = self.summary['grouping_col']
self.grouping_str = ('' if self.grouping_col is None
else self.grouping_col + ',')
self.cate_list = self.summary['category_list'].split(',')
self.ref_cate = self.summary['ref_category']
def _query_output_table(self):
self.output_query = """
SELECT
{grouping_str}
category,
coef
FROM {model_table}
""".format(**self.__dict__)
self.output = plpy.execute(self.output_query)
def _parse_output(self):
self.grouping_keys = [k for k in self.output[0]
if k not in ('coef', 'category')]
coef_dict = defaultdict(dict)
for r in self.output:
# extract grouping values as key
grp_val = tuple([r[k] for k in self.grouping_keys])
# grouping values -> { category -> coef}
coef_dict[grp_val].update({r['category']: r['coef']})
self.grouped_coefs = [dict(zip(self.grouping_keys, grp_val)+[('coef', coef)])
for grp_val, coef in coef_dict.iteritems()]
self.coef0 = coef_dict.values()[0]
self.n_coef = len(self.coef0.values()[0])
def _create_single_model(self, coef):
self._build_mining_schema()
self._build_parameter_list()
self._build_covariate_list()
self._build_ppmatrix()
# pcells
pcell_forest = []
for cate, coef_per_cate in coef.iteritems():
pcell_forest.append(PCell(parameterName="p0",
beta=0, df='1', targetCategory=cate))
for i, c in enumerate(coef_per_cate):
pcell_forest.append(PCell(parameterName="p"+str(i+1),
beta=c, df='1', targetCategory=cate))
return GeneralRegressionModel(self.mining_schema,
self.parameter_list,
FactorList(),
self.covariate_list,
self.ppmatrix,
ParamMatrix(*pcell_forest),
targetVariableName=self.formula.y,
modelType='multinomialLogistic',
functionName=self.function)
class DecisionTreePMMLBuilder(PMMLBuilder):
"""Builder class for PMML model 'TreeModel'
"""
OUTPUT_COLS = ['tree']
SUMMARY_COLS = ['grouping_cols', 'dependent_varname', 'independent_varnames',
'cat_features', 'con_features', 'is_classification',
'dependent_var_levels', 'dependent_var_type', 'independent_var_types']
DATA_TYPE_DICT = {
'boolean': 'boolean',
'char': 'string',
'character': 'string',
'varchar': 'string',
'character varying': 'string',
'text': 'string',
'smallint': 'integer',
'int2': 'integer',
'integer': 'integer',
'int4': 'integer',
'int': 'integer',
'bigint': 'integer',
'int8': 'integer',
'float8': 'double',
'double precision': 'double'
}
def __init__(self, schema_madlib, model_type, model_table, name_spec):
PMMLBuilder.__init__(self, schema_madlib, model_type, model_table, name_spec)
self.mining_schema = None
def _parse_summary(self):
self.y_str = self.summary['dependent_varname']
# assume that summary table sort the independent varnames (cat, con)
self.x_str = self.summary['independent_varnames']
self.x = [s.strip() for s in self.x_str.split(',')]
self.n_coef = len(self.x)
self.grouping_col = self.summary['grouping_cols']
self.grouping_str = ('' if self.grouping_col is None
else self.grouping_col + ',')
self.cat_features = [s.strip()
for s in self.summary['cat_features'].split(',')
if s.strip() != '']
self.con_features = [s.strip()
for s in self.summary['con_features'].split(',')
if s.strip() != '']
self.n_cats = len(self.cat_features)
self.is_classification = self.summary['is_classification']
if self.is_classification:
# remove possible quotes around the values
self.y_levels = [s.strip().strip("\"")
for s in self.summary['dependent_var_levels'].split(',')]
self.y_data_type = self.summary['dependent_var_type']
self.y_data_type = self.__class__.DATA_TYPE_DICT[self.y_data_type.lower()]
# figure out whether each x is categorical or continuous
self.x_optype = ['categorical' if x in self.cat_features else 'continuous'
for x in self.x]
self.x_data_type = [s.strip()
for s in self.summary['independent_var_types'].split(',')]
for i, t in enumerate(self.x_data_type):
self.x_data_type[i] = self.__class__.DATA_TYPE_DICT[t.lower()]
# load the tree frame into memory
def _query_output_table(self):
"""
Re-use _convert_to_rpart_format and _get_split_thresholds
functions to extract the tree information, and store in memory.
"""
output_query = """
SELECT
{grouping_str}
array_to_string(
{schema_madlib}._convert_to_rpart_format(
tree,
{n_cats}
),
',') AS frame,
cat_levels_in_text,
cat_n_levels,
array_to_string(
{schema_madlib}._get_split_thresholds(
tree,
{n_cats}
),
',') AS thresholds
FROM
{model_table}
""".format(**self.__dict__)
self.output = plpy.execute(output_query)
def _parse_output(self):
self.grouped_coefs = [{'coef': {'frame': g['frame'],
'cat_levels': g['cat_levels_in_text'],
'cat_n': g['cat_n_levels'],
'thresholds': g['thresholds']}}
for g in self.output]
for i in range(len(self.output)):
self.grouped_coefs[i].update(
dict((k, v) for k, v in self.output[i].items()
if k not in ['frame', 'cat_levels_in_text',
'cat_n_levels', 'thresholds']))
self.coef0 = self.grouped_coefs[0]['coef']
self.grouping_keys = [k for k in self.output[0] if k not in
['frame', 'cat_levels_in_text',
'cat_n_levels', 'thresholds']]
def _construct_predict_spec(self):
" Construct model attributes "
self.y_value_forest = []
if self.is_classification:
self.y_value_forest.extend(Value(value_=s.strip('"')) for s in self.y_levels)
self.function = 'classification' if self.is_classification else 'regression'
self.y_optype = 'categorical' if self.is_classification else 'continuous'
def _build_mining_schema(self):
if self.mining_schema is None:
mining_field_forest = [MiningField(name=self.formula.y,
usageType='predicted')]
for i, x_i in enumerate(self.x):
mining_field_forest.append(MiningField(name=x_i))
self.mining_schema = MiningSchema(*mining_field_forest)
def _create_single_model(self, coef0):
self._build_mining_schema() # construct MiningSchema
node = self._build_node(coef0) # construct Node tree
return TreeModel(self.mining_schema, node,
functionName=self.function,
algorithmName='cart',
splitCharacteristic='binarySplit')
# Override
def _build_data_dictionary(self):
"""
The independent variables can be either categorical or
continuous.
"""
# Use correct x names before constructing Formula
self.formula.x = self.x
data_field_forest = [DataField(*self.y_value_forest,
name=self.formula.y,
optype=self.y_optype,
dataType=self.y_data_type)]
data_field_forest.extend(DataField(name=self.x[i],
optype=self.x_optype[i],
dataType=self.x_data_type[i])
for i in range(len(self.x)))
data_field_forest.extend(DataField(name=k,
optype='categorical',
dataType='string')
for k in self.grouping_keys)
self.data_dictionary = DataDictionary(*data_field_forest,
numberOfFields=len(data_field_forest))
def _build_node(self, coef):
""" Construct the hierarchy of nodes
- Use self.coef0, which is the frame matrix in madlib.rpart.
- Use a method similar to madlib.rpart to parse the tree structure
For the meaning of each column of the frame matrix, see the manual
of rpart and madlib.rpart
"""
# all node information, see madlib.rpart, double[][]
frame = self._get_one_frame_matrix(coef['frame'])
row_id = self._frame_row_id(frame)
id_to_row = dict((rid, i) for rid, i in zip(row_id, range(len(frame))))
split_var = self._frame_row_names(frame)
cat_n = coef['cat_n'] # number of categorical levels, int[]
# categorical variable levels, string[][]
cat_levels = self._get_cat_levels(coef['cat_levels'], cat_n)
# split values, double[][], including surrogates
thresh = self._get_one_thresh_matrix(coef['thresholds'])
cat_name_to_id = dict((var, i) for var, i in
zip(self.cat_features, range(len(cat_levels))))
# Map from row number to indices of the primary and all surrogates
row_to_thresh = {}
curr_thresh_index = 0
for row in range(len(frame)):
if frame[row][0] >= 0:
n_surr = int(frame[row][7])
row_to_thresh[row] = range(curr_thresh_index,
1 + n_surr + curr_thresh_index)
curr_thresh_index += 1 + n_surr
# plpy.info("thresh:\n" + str(thresh))
# plpy.info("row_to_thresh:\n" + str(row_to_thresh))
# Construct a predicate from i-th row of frame
# For continuous variables, we use SimplePredicate
# However, for categorical variables, we have to use CompoundPredicate
# with the booleanOperator="or"
def construct_predicate(i):
if i == 0: # root node always True
return True_()
parent = id_to_row[(row_id[i] - 1) / 2]
is_left = 2 * row_id[parent] + 1 == row_id[i]
sibling = right_node(parent) if is_left else left_node(parent)
# plpy.info("row: {0}, id: {1}, parent: {2}, parent id: {3}".format(
# i, row_id[i], parent, row_id[parent]))
thresh_indices = row_to_thresh[parent]
predicate_forests = [
construct_predicate_using_thresh_row(
thresh[thresh_index],
is_left,
(self.x[thresh[thresh_index][0]] in self.cat_features))
for thresh_index in thresh_indices]
# to_print = [p.toDOM().toprettyxml(indent=' ') for p in predicate_forests]
# plpy.info("to_print:\n" + str(to_print))
is_majority = (frame[i][2] > frame[sibling][2] or
frame[i][2] == frame[sibling][2] and not is_left)
if is_majority:
predicate_forests.append(True_())
else:
predicate_forests.append(False_())
return CompoundPredicate(*predicate_forests,
booleanOperator='surrogate')
# construct a predicate from a row in thresh
def construct_predicate_using_thresh_row(thresh_row, is_left,
is_split_categorical):
"""consume a row in thresh
NOTE: always compound with False due to PyXB bug which loses the
order of internal predicates if some are compound and others are not
"""
if is_split_categorical:
levels = cat_levels[thresh_row[0]]
if is_left:
selected_level_indices = range(int(thresh_row[1]+1), len(levels))
else:
selected_level_indices = range(0, int(thresh_row[1]+1))
simple_predicates = [
SimplePredicate(field=self.x[thresh_row[0]],
operator="equal",
value_=levels[l])
for l in selected_level_indices]
if len(simple_predicates) == 1:
simple_predicates.append(False_())
else:
op = 'greaterThan' if is_left else 'lessOrEqual'
simple_predicates = [SimplePredicate(field=self.x[thresh_row[0]],
operator=op,
value_=thresh_row[1]),
False_()]
return CompoundPredicate(*simple_predicates, booleanOperator="or")
# compute the score of the node
def compute_node_score(i):
if self.is_classification:
n_y_level = len(self.y_levels)
s = 9 + n_y_level
max_prob = 0
max_level = 0
for j in range(s, s + n_y_level):
if frame[i][j] > max_prob:
max_prob = frame[i][j]
max_level = j - s
return self.y_levels[max_level]
else:
return frame[i][4]
# compute categorical score distributions
# only for classification
def compute_score_distributions(i):
n_y_level = len(self.y_levels)
s = 9
ans = []
for j in range(s, s + n_y_level):
ans.append(ScoreDistribution(
value_=self.y_levels[j - s].strip('"'),
recordCount=int(frame[i][j])))
return ans
# get the row number of left son
def left_node(i):
return id_to_row[2*row_id[i]+1]
# get the row number of right son
def right_node(i):
return id_to_row[2*row_id[i]+2]
# Recursively construct the i-th node using the i-th row of frame
def construct_node(i):
predicate = construct_predicate(i)
score = compute_node_score(i)
record_count = frame[i][1]
choice = []
if self.is_classification:
choice.extend(compute_score_distributions(i))
if frame[i][0] >= 0: # splitting node
choice.extend([construct_node(left_node(i)),
construct_node(right_node(i))])
return Node(predicate, *choice,
id=row_id[i],
score=score,
recordCount=int(record_count))
return construct_node(0)
# Helper function used only by _build_node
def _get_one_frame_matrix(self, frame_string):
"""
frame from the _query_output_table is an array, convert it
into a matrix format to simplify the following processing.
"""
frame_vector = [float(x) for x in frame_string.split(',')]
n = len(frame_vector)
ncol = 10 + 2 * len(self.y_levels) if self.is_classification else 8
nrow = n / ncol
frame_matrix = [[0 for j in range(ncol)] for i in range(nrow)]
# column wise assignment
count = 0
for j in range(ncol):
for i in range(nrow):
frame_matrix[i][j] = frame_vector[count]
count += 1
return frame_matrix
# Helper function used only by _build_node
def _get_one_thresh_matrix(self, thresh_string):
"""
thresh from the _query_output_table is an array, convert it
into a matrix format to simplify the following processing.
"""
thresh_vector = ([] if thresh_string == ''
else [float(x) for x in thresh_string.split(',')])
ncol = 2
nrow = len(thresh_vector) / ncol
thresh_matrix = [[0 for j in range(ncol)] for i in range(nrow)]
# column wise assignment
count = 0
for j in range(ncol):
for i in range(nrow):
if j == 0:
thresh_matrix[i][j] = int(thresh_vector[count])
else:
thresh_matrix[i][j] = thresh_vector[count]
count += 1
return thresh_matrix
# Helper function used only by _build_node
# Extract row_id from the frame matrix. See madlib.rpart
# for more details.
def _frame_row_id(self, frame_matrix):
nrow = len(frame_matrix)
row_id = [0 for i in range(nrow)]
def compute_id(c, r):
row_id[c[0]] = r
if c[0] >= nrow or frame_matrix[c[0]][0] < 0:
c[0] += 1
return
c[0] += 1
compute_id(c, 2*r + 1) # left
compute_id(c, 2*r + 2) # right
count = [0]
compute_id(count, 0)
return row_id
# Helper function used only by _build_node
def _frame_row_names(self, frame_matrix):
" The name of the variable used for splitting node "
nrow = len(frame_matrix)
return [None if frame_matrix[i][0] < 0
else self.x[int(round(frame_matrix[i][0]))]
for i in range(nrow)]
# Helper function used only by _build_node
def _get_cat_levels(self, cat_level_vector, cat_n):
if cat_n is None:
return []
cat_level_array = []
count = 0
for n in cat_n:
tmp = []
for i in range(n):
tmp.append(cat_level_vector[count])
count += 1
cat_level_array.append(tmp)
return cat_level_array
class RandomForestPMMLBuilder(DecisionTreePMMLBuilder):
"""Builder class for PMML model 'ForestModel'
"""
def __init__(self, schema_madlib, model_type, model_table, name_spec):
DecisionTreePMMLBuilder.__init__(self, schema_madlib, model_type,
model_table, name_spec)
self.mining_schema = None
def _parse_summary(self):
self.y_str = self.summary['dependent_varname']
# assume that summary table sort the independent varnames (cat, con)
self.x_str = self.summary['independent_varnames']
self.x = [s.strip() for s in self.x_str.split(',')]
self.n_coef = len(self.x)
self.grouping_col = self.summary['grouping_cols']
self.grouping_str = ('' if self.grouping_col is None
else self.grouping_col + ',')
self.cat_features = [s.strip()
for s in self.summary['cat_features'].split(',')
if s.strip() != '']
self.con_features = [s.strip()
for s in self.summary['con_features'].split(',')
if s.strip() != '']
self.n_cats = len(self.cat_features)
self.is_classification = self.summary['is_classification']
if self.is_classification:
# remove possible quotes around the values
self.y_levels = [s.strip().strip("\"")
for s in self.summary['dependent_var_levels'].split(',')]
self.y_data_type = self.summary['dependent_var_type']
self.y_data_type = self.__class__.DATA_TYPE_DICT[self.y_data_type.lower()]
# figure out whether each x is categorical or continuous
self.x_optype = ['categorical' if x in self.cat_features else 'continuous'
for x in self.x]
self.model_table_group = self.model_table + "_group"
self.x_data_type = [s.strip()
for s in self.summary['independent_var_types'].split(',')]
for i, t in enumerate(self.x_data_type):
self.x_data_type[i] = self.__class__.DATA_TYPE_DICT[t.lower()]
def _query_output_table(self):
"""
Re-use _convert_to_rpart_format and _get_split_thresholds
functions to extract the tree information, and store in memory.
"""
output_query = """
SELECT
{grouping_str}
gid,
array_to_string(
{schema_madlib}._convert_to_rpart_format(
tree,
{n_cats}
),
',') AS frame,
cat_levels_in_text,
cat_n_levels,
array_to_string(
{schema_madlib}._get_split_thresholds(
tree,
{n_cats}
),
',') AS thresholds
FROM
{model_table}
JOIN
{model_table_group}
using (gid)
""".format(**self.__dict__)
self.output = plpy.execute(output_query)
def _parse_output(self):
self.gids = [g['gid'] for g in self.output]
self.grouped_coefs = [{'coef': []} for index in range(max(self.gids))]
# grouped_coefs groups trees based on gid
# difference from decision trees is that, key 'coef'
# points to a list of dictionaries instead of a single dictionary.
# each dictionary contains information on one tree of the forest.
for g in self.output:
gid = g['gid']
dl = self.grouped_coefs[gid-1]['coef']
if dl is None:
dl = []
dl.append({'frame': g['frame'],
'cat_levels': g['cat_levels_in_text'],
'cat_n': g['cat_n_levels'],
'thresholds': g['thresholds'],
'gid': g['gid']})
self.grouped_coefs[gid - 1]['coef'] = dl
self.grouped_coefs[gid - 1].update(
dict((k, v) for k, v in g.items()
if k not in ['frame', 'cat_levels_in_text',
'cat_n_levels', 'thresholds', 'gid']))
self.coef0 = self.grouped_coefs[0]['coef']
self.grouping_keys = [k for k in self.output[0]
if k not in ['frame', 'cat_levels_in_text',
'cat_n_levels', 'thresholds', 'gid']]
def _create_single_model(self, coef0):
self._build_mining_schema() # construct MiningSchema
# Create segmentation of tree models, combined based on
# classification or regression
segment_forest = []
for each_coef in coef0:
node = self._build_node(each_coef) # construct Node tree
single_model = TreeModel(self.mining_schema, node,
functionName=self.function,
algorithmName='randomForest_model',
splitCharacteristic='binarySplit')
segment_forest.append(Segment(True_(), single_model))
multipleModelMethod = 'majorityVote' if self.is_classification else 'average'
segmentation = Segmentation(*segment_forest,
multipleModelMethod=multipleModelMethod)
return MiningModel(self.mining_schema,
segmentation,
functionName=self.function)