| import plpy |
| |
| from binding import * |
| import binding |
| from pyxb.utils.domutils import BindingDOMSupport as bds |
| from formula import Formula |
| |
| from glm.glm import __extract_family_params as extract_family_params |
| |
| from datetime import datetime |
| from time import tzname, daylight |
| from collections import defaultdict |
| |
| from utilities.utilities import current_user |
| from utilities.utilities import madlib_version |
| from utilities.utilities import add_postfix |
| from utilities.validate_args import cols_in_tbl_valid |
| |
| |
| class PMMLBuilder(object): |
| """This generic class for PMML export for any model with grouping support. |
| |
| The _construct_predict_spec method will set up PMML model attribute based |
| on expected prediction type: regression or classification. Current |
| implementation uses default prediction type for each model. In the future, |
| the function may support user specification. |
| """ |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| self.schema_madlib = schema_madlib |
| self.model_table = model_table |
| self.model_type = model_type |
| self.name_spec = name_spec |
| self.pmml_str = None |
| |
| def _validate_output_table(self): |
| cols_in_tbl_valid(self.model_table, |
| self.__class__.OUTPUT_COLS, |
| 'PMML') |
| |
| def _validate_summary_table(self): |
| cols_in_tbl_valid(add_postfix(self.model_table, '_summary'), |
| self.__class__.SUMMARY_COLS, |
| 'PMML') |
| |
| def _query_summary_table(self): |
| summary_str = ', '.join(self.__class__.SUMMARY_COLS) |
| summary_query = """ |
| SELECT {summary_str} |
| FROM {table} |
| """.format(summary_str=summary_str, |
| table=add_postfix(self.model_table, '_summary')) |
| self.summary = plpy.execute(summary_query)[0] |
| |
| def _query_output_table(self): |
| raise NotImplementedError |
| |
| def _parse_summary(self): |
| raise NotImplementedError |
| |
| def _parse_output(self): |
| raise NotImplementedError |
| |
| def _construct_predict_spec(self): |
| raise NotImplementedError |
| |
| def _construct_formula(self): |
| self.formula = Formula(self.y_str, self.x_str, self.n_coef) |
| if self.name_spec is not None: |
| self.formula.rename(self.name_spec) |
| else: |
| # change the y column name to avoid possible |
| # name conflicts |
| if all(s.lower() in '0123456789_abcdefghijklmnopqrstuvwxyz' for s in self.formula.y): |
| self.formula.y += '_pmml_prediction' |
| else: |
| self.formula.y = '(' + self.formula.y + ')_pmml_prediction' |
| |
| def _build_header(self): |
| user = current_user() |
| extension = Extension(name='user', value_=user, extender='MADlib') |
| application = Application(name='MADlib', |
| version=madlib_version(self.schema_madlib)) |
| now = datetime.now() |
| copyright = "Copyright (c) {year} {user}".format(year=now.year, user=user) |
| timestamp = Timestamp(str(now) + " " + tzname[daylight]) |
| self.header = Header(extension, application, timestamp, copyright=copyright) |
| |
| def _build_data_dictionary(self): |
| data_field_forest = [DataField(*self.y_value_forest, |
| name=self.formula.y, |
| optype=self.y_optype, |
| dataType=self.y_data_type)] |
| data_field_forest.extend([DataField(name=x_i, |
| optype='continuous', |
| dataType='double') |
| for x_i in self.formula.x]) |
| data_field_forest.extend([DataField(name=k, |
| optype='categorical', |
| dataType='string') |
| for k in self.grouping_keys]) |
| |
| self.data_dictionary = DataDictionary(*data_field_forest, |
| numberOfFields=len(data_field_forest)) |
| |
| def _build_model(self): |
| if self.grouping_keys == []: # no grouping |
| self.model = self._create_single_model(self.coef0) |
| else: # grouping |
| # MiningSchema |
| mining_field_forest = \ |
| [MiningField(name=self.formula.y, usageType='predicted')] |
| mining_field_forest.extend( |
| [MiningField(name=x_i) for x_i in self.formula.x]) |
| mining_field_forest.extend( |
| [MiningField(name=k) for k in self.grouping_keys]) |
| grouping_mining_schema = MiningSchema(*mining_field_forest) |
| |
| # Segmentation |
| segment_forest = [] |
| for g in self.grouped_coefs: # one segment for each row in output table |
| if len(g['coef'])==0: |
| continue |
| predicate_forest = [SimplePredicate(field=k, |
| value_=g[k], |
| operator='equal') |
| for k in self.grouping_keys] |
| if len(predicate_forest) == 1: |
| predicate = predicate_forest[0] |
| else: |
| predicate = CompoundPredicate(*predicate_forest, |
| booleanOperator='and') |
| single_model = self._create_single_model(g['coef']) |
| segment_forest.append(Segment(predicate, single_model)) |
| segmentation = Segmentation(*segment_forest, |
| multipleModelMethod='selectFirst') |
| |
| # MiningModel |
| self.model = MiningModel(grouping_mining_schema, |
| segmentation, |
| functionName=self.function) |
| |
| def _format(self, pmml): |
| """Returns a pretty-printed XML string for the given PMML. |
| """ |
| bds.SetDefaultNamespace(binding.Namespace) # avoid ns:xxx prefix |
| declaration = "<?xml version=\"1.0\" standalone=\"yes\"?>\n" |
| xml = pmml.toDOM().toprettyxml(indent=" ") |
| self.pmml_str = declaration + "\n".join(xml.split("\n")[1:]) |
| |
| def query(self): |
| # summary table |
| self._validate_summary_table() |
| self._query_summary_table() |
| self._parse_summary() |
| |
| # output table |
| self._validate_output_table() |
| self._query_output_table() |
| self._parse_output() |
| |
| def build(self): |
| self._construct_predict_spec() |
| self._construct_formula() |
| |
| self._build_header() |
| self._build_data_dictionary() |
| self._build_model() |
| pmml = PMML(self.header, self.data_dictionary, self.model, version='4.1') |
| |
| self._format(pmml) |
| |
| |
| class RegressionPMMLBuilder(PMMLBuilder): |
| """Builder class for PMML model 'RegressionModel'. |
| |
| Current implementation supports linear regression and logistic regression model. |
| """ |
| PREDICT_SPEC_DICT = { |
| 'logregr': {'function_name': 'classification', |
| 'y_optype': 'categorical', |
| 'y_data_type': 'boolean'}, |
| 'linregr': {'function_name': 'regression', |
| 'y_optype': 'continuous', |
| 'y_data_type': 'double'} |
| } |
| OUTPUT_COLS = ['coef'] |
| SUMMARY_COLS = ['grouping_col', 'dependent_varname', 'independent_varname'] |
| |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| PMMLBuilder.__init__( |
| self, schema_madlib, model_type, model_table, name_spec) |
| self.mining_schema = None |
| |
| def _parse_summary(self): |
| self.y_str = self.summary['dependent_varname'] |
| self.x_str = self.summary['independent_varname'] |
| self.grouping_col = self.summary['grouping_col'] |
| self.grouping_str = ('' if self.grouping_col is None |
| else self.grouping_col + ',') |
| |
| def _query_output_table(self): |
| output_query = """ |
| SELECT {grouping_str} coef |
| FROM {model_table} |
| """.format(**self.__dict__) |
| self.output = plpy.execute(output_query) |
| |
| def _parse_output(self): |
| self.grouped_coefs = self.output |
| self.coef0 = self.output[0]['coef'] |
| self.n_coef = len(self.coef0) |
| self.grouping_keys = [k for k in self.output[0] if k != 'coef'] |
| |
| def _construct_predict_spec(self): |
| self.predict_spec = self.__class__.PREDICT_SPEC_DICT[self.model_type] |
| self.function = self.predict_spec['function_name'] |
| self.y_value_forest = [] |
| if self.function == 'classification': |
| self.y_value_forest.extend([Value(value_='True'), Value(value_='False')]) |
| self.y_optype = self.predict_spec['y_optype'] |
| self.y_data_type = self.predict_spec['y_data_type'] |
| |
| def _build_mining_schema(self): |
| if self.mining_schema is None: |
| mining_field_forest = [MiningField(name=self.formula.y, |
| usageType='predicted')] |
| for i, x_i in enumerate(self.formula.x): |
| mining_field_forest.append(MiningField(name=x_i)) |
| self.mining_schema = MiningSchema(*mining_field_forest) |
| |
| def _create_numeric_predictors(self, coef): |
| numeric_predictor_forest = [] |
| for i, e in enumerate(coef): |
| numeric_predictor_forest.append( |
| NumericPredictor(name=self.formula.x[i], coefficient=e)) |
| return numeric_predictor_forest |
| |
| def _create_model_regression(self, numeric_predictor_forest): |
| regression_table_forest = [RegressionTable(*numeric_predictor_forest, |
| intercept='0')] |
| return RegressionModel(self.mining_schema, |
| *regression_table_forest, |
| functionName=self.function) |
| |
| def _create_model_classification(self, numeric_predictor_forest): |
| regression_table_forest = [ |
| RegressionTable(*numeric_predictor_forest, |
| targetCategory=True, intercept='0'), |
| RegressionTable(targetCategory=False, intercept='0')] |
| return RegressionModel(self.mining_schema, |
| *regression_table_forest, |
| functionName=self.function, |
| normalizationMethod='softmax') |
| |
| def _create_single_model(self, coef): |
| self._build_mining_schema() |
| numeric_predictor_forest = self._create_numeric_predictors(coef) |
| |
| if self.function == 'regression': |
| return self._create_model_regression(numeric_predictor_forest) |
| elif self.function == 'classification': |
| return self._create_model_classification(numeric_predictor_forest) |
| |
| |
| class GeneralRegressionPMMLBuilder(RegressionPMMLBuilder): |
| """Builder class for PMML model 'GeneralRegressionModel'. |
| """ |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| RegressionPMMLBuilder.__init__( |
| self, schema_madlib, model_type, model_table, name_spec) |
| self.parameter_list = None |
| self.covariate_list = None |
| self.ppmatrix = None |
| |
| def _build_parameter_list(self): |
| if self.parameter_list is None: |
| parameter_forest = [Parameter(name='p0', label='Intercept')] |
| for i, x_i in enumerate(self.formula.x): |
| parameter_forest.append(Parameter(name="p"+str(i+1), label=x_i)) |
| self.parameter_list = ParameterList(*parameter_forest) |
| |
| def _build_covariate_list(self): |
| if self.covariate_list is None: |
| predictor_forest = [] |
| for i, x_i in enumerate(self.formula.x): |
| predictor_forest.append(Predictor(name=x_i)) |
| self.covariate_list = CovariateList(*predictor_forest) |
| |
| def _build_ppmatrix(self): |
| if self.ppmatrix is None: |
| ppcell_forest = [] |
| for i, x_i in enumerate(self.formula.x): |
| ppcell_forest.append(PPCell(value_="1", |
| predictorName=x_i, |
| parameterName="p"+str(i+1))) |
| self.ppmatrix = PPMatrix(*ppcell_forest) |
| |
| |
| class GLMPMMLBuilder(GeneralRegressionPMMLBuilder): |
| """Builder class for PMML modelType='generalizedLinear'. |
| |
| Current implementaion supports GLM Gaussian, Binomial, Gamma, Poisson |
| Inverse Gaussian. |
| """ |
| PREDICT_SPEC_DICT = { |
| 'gaussian': {'function_name': 'regression', |
| 'y_optype': 'continuous', |
| 'y_data_type': 'double', |
| 'distribution': 'normal'}, |
| 'binomial': {'function_name': 'classification', |
| 'y_optype': 'categorical', |
| 'y_data_type': 'boolean', |
| 'distribution': 'binomial'}, |
| 'gamma': {'function_name': 'regression', |
| 'y_optype': 'continuous', |
| 'y_data_type': 'double', |
| 'distribution': 'gamma'}, |
| 'poisson': {'function_name': 'regression', |
| 'y_optype': 'continuous', |
| 'y_data_type': 'integer', |
| 'distribution': 'poisson'}, |
| 'inverse_gaussian': {'function_name': 'regression', |
| 'y_optype': 'continuous', |
| 'y_data_type': 'double', |
| 'distribution': 'igauss'} |
| } |
| LINK_SPEC_DICT = { |
| 'inverse': {'linkFunction': 'power', |
| 'linkParameter': '-1'}, |
| 'log': {'linkFunction': 'log'}, |
| 'identity': {'linkFunction': 'identity'}, |
| 'probit': {'linkFunction': 'probit'}, |
| 'logit': {'linkFunction': 'logit'}, |
| 'sqrt': {'linkFunction': 'power', |
| 'linkParameter': '0.5'}, |
| 'sqr_inverse': {'linkFunction': 'power', |
| 'linkParameter': '-2'} |
| } |
| SUMMARY_COLS = ['grouping_col', |
| 'dependent_varname', |
| 'independent_varname', |
| 'family_params' |
| ] |
| |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| GeneralRegressionPMMLBuilder.__init__( |
| self, schema_madlib, model_type, model_table, name_spec) |
| |
| def _parse_summary(self): |
| self.y_str = self.summary['dependent_varname'] |
| self.x_str = self.summary['independent_varname'] |
| self.grouping_col = self.summary['grouping_col'] |
| self.grouping_str = ('' if self.grouping_col is None |
| else self.grouping_col + ',') |
| self.family_params_dict = extract_family_params( |
| self.schema_madlib, self.summary['family_params']) |
| self.family = self.family_params_dict['family'] |
| self.link = self.family_params_dict['link'] |
| self.link_spec = self.__class__.LINK_SPEC_DICT[self.link] |
| |
| def _construct_predict_spec(self): |
| self.predict_spec = self.__class__.PREDICT_SPEC_DICT[self.family] |
| self.function = self.predict_spec['function_name'] |
| self.y_value_forest = [] |
| if self.function == 'classification' and self.family == 'binomial': |
| self.y_value_forest.extend([Value(value_='True'), Value(value_='False')]) |
| self.y_optype = self.predict_spec['y_optype'] |
| self.distribution = self.predict_spec['distribution'] |
| self.y_data_type = self.predict_spec['y_data_type'] |
| |
| def _create_single_model(self, coef): |
| self._build_mining_schema() |
| self._build_parameter_list() |
| self._build_covariate_list() |
| self._build_ppmatrix() |
| |
| # pcells |
| pcell_attrib0 = dict(parameterName='p0', beta='0', df='1') |
| if self.function == 'classification': |
| pcell_attrib0['targetCategory'] = True |
| pcell_forest = [PCell(**pcell_attrib0)] |
| for i, e in enumerate(coef): |
| pcell_attrib = dict(parameterName="p"+str(i+1), beta=e, df='1') |
| if self.function == 'classification': |
| pcell_attrib['targetCategory'] = True |
| pcell_forest.append(PCell(**pcell_attrib)) |
| |
| return GeneralRegressionModel(self.mining_schema, |
| self.parameter_list, |
| FactorList(), |
| self.covariate_list, |
| self.ppmatrix, |
| ParamMatrix(*pcell_forest), |
| targetVariableName=self.formula.y, |
| modelType='generalizedLinear', |
| distribution=self.distribution, |
| functionName=self.function, |
| **self.link_spec) |
| |
| |
| class MultiClassRegressionPMMLBuilder(GeneralRegressionPMMLBuilder): |
| """Base builder class for Multinomial logistic and Ordinal. |
| """ |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| GeneralRegressionPMMLBuilder.__init__( |
| self, schema_madlib, model_type, model_table, name_spec) |
| |
| def _construct_predict_spec(self): |
| self.function = 'classification' |
| self.y_value_forest = [Value(value_=x_i) for x_i in self.cate_list] |
| self.y_optype = 'categorical' |
| self.y_data_type = 'string' |
| |
| |
| class OrdinalRegressionPMMLBuilder(MultiClassRegressionPMMLBuilder): |
| """Builder class for PMML modelType='ordinalMultinomial'. |
| |
| The signs of feature coefficients are different between PMML and madlib.ordinal() |
| output. |
| """ |
| OUTPUT_COLS = ['coef_threshold', 'coef_feature'] |
| SUMMARY_COLS = ['grouping_col', |
| 'dependent_varname', |
| 'independent_varname', |
| 'link_func', |
| 'category_list'] |
| |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| MultiClassRegressionPMMLBuilder.__init__( |
| self, schema_madlib, model_type, model_table, name_spec) |
| |
| def _parse_summary(self): |
| self.y_str = self.summary['dependent_varname'] |
| self.x_str = self.summary['independent_varname'] |
| self.grouping_col = self.summary['grouping_col'] |
| self.grouping_str = ('' if self.grouping_col is None |
| else self.grouping_col + ',') |
| self.link = self.summary['link_func'] |
| self.cate_list = self.summary['category_list'].split(',') |
| |
| def _query_output_table(self): |
| self.output_query = """ |
| SELECT |
| {grouping_str} |
| coef_feature || coef_threshold as coef, |
| coef_feature |
| FROM {model_table} |
| """.format(**self.__dict__) |
| self.output = plpy.execute(self.output_query) |
| |
| def _parse_output(self): |
| self.grouped_coefs = self.output |
| self.coef0 = self.output[0]['coef'] |
| self.n_coef = len(self.output[0]['coef_feature']) |
| self.grouping_keys = [k for k in self.output[0] |
| if k not in ('coef', 'coef_feature')] |
| |
| def _create_single_model(self, coef): |
| coef_threshold = coef[self.formula.n_coef:] |
| coef_feature = coef[:self.formula.n_coef] |
| |
| self._build_mining_schema() |
| self._build_parameter_list() |
| self._build_covariate_list() |
| self._build_ppmatrix() |
| |
| # pcells |
| pcell_forest = [] |
| for i, e in enumerate(coef_threshold): |
| pcell_forest.append(PCell(parameterName='p0', |
| beta=e, |
| df='1', |
| targetCategory=self.cate_list[i])) |
| for i, e in enumerate(coef_feature): |
| # -e is used |
| pcell_forest.append(PCell(parameterName="p"+str(i+1), beta=-e, df='1')) |
| |
| return GeneralRegressionModel(self.mining_schema, |
| self.parameter_list, |
| FactorList(), |
| self.covariate_list, |
| self.ppmatrix, |
| ParamMatrix(*pcell_forest), |
| targetVariableName=self.formula.y, |
| modelType='ordinalMultinomial', |
| functionName=self.function, |
| cumulativeLink=self.link) |
| |
| |
| class MultinomRegressionPMMLBuilder(MultiClassRegressionPMMLBuilder): |
| """Builder class for PMML modelType='multinomialLogistic'. |
| """ |
| OUTPUT_COLS = ['category', 'coef'] |
| SUMMARY_COLS = ['grouping_col', |
| 'dependent_varname', |
| 'independent_varname', |
| 'category_list', |
| 'ref_category'] |
| |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| MultiClassRegressionPMMLBuilder.__init__( |
| self, schema_madlib, model_type, model_table, name_spec) |
| |
| def _parse_summary(self): |
| self.y_str = self.summary['dependent_varname'] |
| self.x_str = self.summary['independent_varname'] |
| self.grouping_col = self.summary['grouping_col'] |
| self.grouping_str = ('' if self.grouping_col is None |
| else self.grouping_col + ',') |
| self.cate_list = self.summary['category_list'].split(',') |
| self.ref_cate = self.summary['ref_category'] |
| |
| def _query_output_table(self): |
| self.output_query = """ |
| SELECT |
| {grouping_str} |
| category, |
| coef |
| FROM {model_table} |
| """.format(**self.__dict__) |
| self.output = plpy.execute(self.output_query) |
| |
| def _parse_output(self): |
| self.grouping_keys = [k for k in self.output[0] |
| if k not in ('coef', 'category')] |
| coef_dict = defaultdict(dict) |
| for r in self.output: |
| # extract grouping values as key |
| grp_val = tuple([r[k] for k in self.grouping_keys]) |
| # grouping values -> { category -> coef} |
| coef_dict[grp_val].update({r['category']: r['coef']}) |
| self.grouped_coefs = [dict(zip(self.grouping_keys, grp_val)+[('coef', coef)]) |
| for grp_val, coef in coef_dict.iteritems()] |
| self.coef0 = coef_dict.values()[0] |
| self.n_coef = len(self.coef0.values()[0]) |
| |
| def _create_single_model(self, coef): |
| self._build_mining_schema() |
| self._build_parameter_list() |
| self._build_covariate_list() |
| self._build_ppmatrix() |
| |
| # pcells |
| pcell_forest = [] |
| for cate, coef_per_cate in coef.iteritems(): |
| pcell_forest.append(PCell(parameterName="p0", |
| beta=0, df='1', targetCategory=cate)) |
| for i, c in enumerate(coef_per_cate): |
| pcell_forest.append(PCell(parameterName="p"+str(i+1), |
| beta=c, df='1', targetCategory=cate)) |
| |
| return GeneralRegressionModel(self.mining_schema, |
| self.parameter_list, |
| FactorList(), |
| self.covariate_list, |
| self.ppmatrix, |
| ParamMatrix(*pcell_forest), |
| targetVariableName=self.formula.y, |
| modelType='multinomialLogistic', |
| functionName=self.function) |
| |
| |
| class DecisionTreePMMLBuilder(PMMLBuilder): |
| """Builder class for PMML model 'TreeModel' |
| """ |
| OUTPUT_COLS = ['tree'] |
| SUMMARY_COLS = ['grouping_cols', 'dependent_varname', 'independent_varnames', |
| 'cat_features', 'con_features', 'is_classification', |
| 'dependent_var_levels', 'dependent_var_type', 'independent_var_types'] |
| DATA_TYPE_DICT = { |
| 'boolean': 'boolean', |
| 'char': 'string', |
| 'character': 'string', |
| 'varchar': 'string', |
| 'character varying': 'string', |
| 'text': 'string', |
| 'smallint': 'integer', |
| 'int2': 'integer', |
| 'integer': 'integer', |
| 'int4': 'integer', |
| 'int': 'integer', |
| 'bigint': 'integer', |
| 'int8': 'integer', |
| 'float8': 'double', |
| 'double precision': 'double' |
| } |
| |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| PMMLBuilder.__init__(self, schema_madlib, model_type, model_table, name_spec) |
| self.mining_schema = None |
| |
| def _parse_summary(self): |
| self.y_str = self.summary['dependent_varname'] |
| # assume that summary table sort the independent varnames (cat, con) |
| self.x_str = self.summary['independent_varnames'] |
| self.x = [s.strip() for s in self.x_str.split(',')] |
| self.n_coef = len(self.x) |
| |
| self.grouping_col = self.summary['grouping_cols'] |
| self.grouping_str = ('' if self.grouping_col is None |
| else self.grouping_col + ',') |
| |
| self.cat_features = [s.strip() |
| for s in self.summary['cat_features'].split(',') |
| if s.strip() != ''] |
| self.con_features = [s.strip() |
| for s in self.summary['con_features'].split(',') |
| if s.strip() != ''] |
| self.n_cats = len(self.cat_features) |
| |
| self.is_classification = self.summary['is_classification'] |
| if self.is_classification: |
| # remove possible quotes around the values |
| self.y_levels = [s.strip().strip("\"") |
| for s in self.summary['dependent_var_levels'].split(',')] |
| |
| self.y_data_type = self.summary['dependent_var_type'] |
| self.y_data_type = self.__class__.DATA_TYPE_DICT[self.y_data_type.lower()] |
| |
| # figure out whether each x is categorical or continuous |
| self.x_optype = ['categorical' if x in self.cat_features else 'continuous' |
| for x in self.x] |
| self.x_data_type = [s.strip() |
| for s in self.summary['independent_var_types'].split(',')] |
| for i, t in enumerate(self.x_data_type): |
| self.x_data_type[i] = self.__class__.DATA_TYPE_DICT[t.lower()] |
| |
| # load the tree frame into memory |
| def _query_output_table(self): |
| """ |
| Re-use _convert_to_rpart_format and _get_split_thresholds |
| functions to extract the tree information, and store in memory. |
| """ |
| output_query = """ |
| SELECT |
| {grouping_str} |
| array_to_string( |
| {schema_madlib}._convert_to_rpart_format( |
| tree, |
| {n_cats} |
| ), |
| ',') AS frame, |
| cat_levels_in_text, |
| cat_n_levels, |
| array_to_string( |
| {schema_madlib}._get_split_thresholds( |
| tree, |
| {n_cats} |
| ), |
| ',') AS thresholds |
| FROM |
| {model_table} |
| """.format(**self.__dict__) |
| self.output = plpy.execute(output_query) |
| |
| def _parse_output(self): |
| self.grouped_coefs = [{'coef': {'frame': g['frame'], |
| 'cat_levels': g['cat_levels_in_text'], |
| 'cat_n': g['cat_n_levels'], |
| 'thresholds': g['thresholds']}} |
| for g in self.output] |
| for i in range(len(self.output)): |
| self.grouped_coefs[i].update( |
| dict((k, v) for k, v in self.output[i].items() |
| if k not in ['frame', 'cat_levels_in_text', |
| 'cat_n_levels', 'thresholds'])) |
| self.coef0 = self.grouped_coefs[0]['coef'] |
| self.grouping_keys = [k for k in self.output[0] if k not in |
| ['frame', 'cat_levels_in_text', |
| 'cat_n_levels', 'thresholds']] |
| |
| def _construct_predict_spec(self): |
| " Construct model attributes " |
| self.y_value_forest = [] |
| if self.is_classification: |
| self.y_value_forest.extend(Value(value_=s.strip('"')) for s in self.y_levels) |
| self.function = 'classification' if self.is_classification else 'regression' |
| self.y_optype = 'categorical' if self.is_classification else 'continuous' |
| |
| def _build_mining_schema(self): |
| if self.mining_schema is None: |
| mining_field_forest = [MiningField(name=self.formula.y, |
| usageType='predicted')] |
| for i, x_i in enumerate(self.x): |
| mining_field_forest.append(MiningField(name=x_i)) |
| self.mining_schema = MiningSchema(*mining_field_forest) |
| |
| def _create_single_model(self, coef0): |
| self._build_mining_schema() # construct MiningSchema |
| node = self._build_node(coef0) # construct Node tree |
| |
| return TreeModel(self.mining_schema, node, |
| functionName=self.function, |
| algorithmName='cart', |
| splitCharacteristic='binarySplit') |
| |
| # Override |
| def _build_data_dictionary(self): |
| """ |
| The independent variables can be either categorical or |
| continuous. |
| """ |
| # Use correct x names before constructing Formula |
| self.formula.x = self.x |
| |
| data_field_forest = [DataField(*self.y_value_forest, |
| name=self.formula.y, |
| optype=self.y_optype, |
| dataType=self.y_data_type)] |
| |
| data_field_forest.extend(DataField(name=self.x[i], |
| optype=self.x_optype[i], |
| dataType=self.x_data_type[i]) |
| for i in range(len(self.x))) |
| |
| data_field_forest.extend(DataField(name=k, |
| optype='categorical', |
| dataType='string') |
| for k in self.grouping_keys) |
| |
| self.data_dictionary = DataDictionary(*data_field_forest, |
| numberOfFields=len(data_field_forest)) |
| |
| def _build_node(self, coef): |
| """ Construct the hierarchy of nodes |
| - Use self.coef0, which is the frame matrix in madlib.rpart. |
| - Use a method similar to madlib.rpart to parse the tree structure |
| |
| For the meaning of each column of the frame matrix, see the manual |
| of rpart and madlib.rpart |
| """ |
| # all node information, see madlib.rpart, double[][] |
| frame = self._get_one_frame_matrix(coef['frame']) |
| row_id = self._frame_row_id(frame) |
| id_to_row = dict((rid, i) for rid, i in zip(row_id, range(len(frame)))) |
| split_var = self._frame_row_names(frame) |
| |
| cat_n = coef['cat_n'] # number of categorical levels, int[] |
| # categorical variable levels, string[][] |
| cat_levels = self._get_cat_levels(coef['cat_levels'], cat_n) |
| # split values, double[][], including surrogates |
| thresh = self._get_one_thresh_matrix(coef['thresholds']) |
| |
| cat_name_to_id = dict((var, i) for var, i in |
| zip(self.cat_features, range(len(cat_levels)))) |
| |
| # Map from row number to indices of the primary and all surrogates |
| row_to_thresh = {} |
| curr_thresh_index = 0 |
| for row in range(len(frame)): |
| if frame[row][0] >= 0: |
| n_surr = int(frame[row][7]) |
| row_to_thresh[row] = range(curr_thresh_index, |
| 1 + n_surr + curr_thresh_index) |
| curr_thresh_index += 1 + n_surr |
| # plpy.info("thresh:\n" + str(thresh)) |
| # plpy.info("row_to_thresh:\n" + str(row_to_thresh)) |
| |
| # Construct a predicate from i-th row of frame |
| # For continuous variables, we use SimplePredicate |
| # However, for categorical variables, we have to use CompoundPredicate |
| # with the booleanOperator="or" |
| def construct_predicate(i): |
| if i == 0: # root node always True |
| return True_() |
| parent = id_to_row[(row_id[i] - 1) / 2] |
| is_left = 2 * row_id[parent] + 1 == row_id[i] |
| sibling = right_node(parent) if is_left else left_node(parent) |
| |
| # plpy.info("row: {0}, id: {1}, parent: {2}, parent id: {3}".format( |
| # i, row_id[i], parent, row_id[parent])) |
| |
| thresh_indices = row_to_thresh[parent] |
| predicate_forests = [ |
| construct_predicate_using_thresh_row( |
| thresh[thresh_index], |
| is_left, |
| (self.x[thresh[thresh_index][0]] in self.cat_features)) |
| for thresh_index in thresh_indices] |
| |
| # to_print = [p.toDOM().toprettyxml(indent=' ') for p in predicate_forests] |
| # plpy.info("to_print:\n" + str(to_print)) |
| |
| is_majority = (frame[i][2] > frame[sibling][2] or |
| frame[i][2] == frame[sibling][2] and not is_left) |
| if is_majority: |
| predicate_forests.append(True_()) |
| else: |
| predicate_forests.append(False_()) |
| |
| return CompoundPredicate(*predicate_forests, |
| booleanOperator='surrogate') |
| |
| # construct a predicate from a row in thresh |
| def construct_predicate_using_thresh_row(thresh_row, is_left, |
| is_split_categorical): |
| """consume a row in thresh |
| |
| NOTE: always compound with False due to PyXB bug which loses the |
| order of internal predicates if some are compound and others are not |
| """ |
| |
| if is_split_categorical: |
| levels = cat_levels[thresh_row[0]] |
| if is_left: |
| selected_level_indices = range(int(thresh_row[1]+1), len(levels)) |
| else: |
| selected_level_indices = range(0, int(thresh_row[1]+1)) |
| simple_predicates = [ |
| SimplePredicate(field=self.x[thresh_row[0]], |
| operator="equal", |
| value_=levels[l]) |
| for l in selected_level_indices] |
| |
| if len(simple_predicates) == 1: |
| simple_predicates.append(False_()) |
| else: |
| op = 'greaterThan' if is_left else 'lessOrEqual' |
| |
| simple_predicates = [SimplePredicate(field=self.x[thresh_row[0]], |
| operator=op, |
| value_=thresh_row[1]), |
| False_()] |
| |
| return CompoundPredicate(*simple_predicates, booleanOperator="or") |
| |
| # compute the score of the node |
| def compute_node_score(i): |
| if self.is_classification: |
| n_y_level = len(self.y_levels) |
| s = 9 + n_y_level |
| max_prob = 0 |
| max_level = 0 |
| for j in range(s, s + n_y_level): |
| if frame[i][j] > max_prob: |
| max_prob = frame[i][j] |
| max_level = j - s |
| return self.y_levels[max_level] |
| else: |
| return frame[i][4] |
| |
| # compute categorical score distributions |
| # only for classification |
| def compute_score_distributions(i): |
| n_y_level = len(self.y_levels) |
| s = 9 |
| ans = [] |
| for j in range(s, s + n_y_level): |
| ans.append(ScoreDistribution( |
| value_=self.y_levels[j - s].strip('"'), |
| recordCount=int(frame[i][j]))) |
| return ans |
| |
| # get the row number of left son |
| def left_node(i): |
| return id_to_row[2*row_id[i]+1] |
| |
| # get the row number of right son |
| def right_node(i): |
| return id_to_row[2*row_id[i]+2] |
| |
| # Recursively construct the i-th node using the i-th row of frame |
| def construct_node(i): |
| predicate = construct_predicate(i) |
| score = compute_node_score(i) |
| record_count = frame[i][1] |
| choice = [] |
| |
| if self.is_classification: |
| choice.extend(compute_score_distributions(i)) |
| |
| if frame[i][0] >= 0: # splitting node |
| choice.extend([construct_node(left_node(i)), |
| construct_node(right_node(i))]) |
| |
| return Node(predicate, *choice, |
| id=row_id[i], |
| score=score, |
| recordCount=int(record_count)) |
| |
| return construct_node(0) |
| |
| # Helper function used only by _build_node |
| def _get_one_frame_matrix(self, frame_string): |
| """ |
| frame from the _query_output_table is an array, convert it |
| into a matrix format to simplify the following processing. |
| """ |
| frame_vector = [float(x) for x in frame_string.split(',')] |
| n = len(frame_vector) |
| ncol = 10 + 2 * len(self.y_levels) if self.is_classification else 8 |
| nrow = n / ncol |
| frame_matrix = [[0 for j in range(ncol)] for i in range(nrow)] |
| |
| # column wise assignment |
| count = 0 |
| for j in range(ncol): |
| for i in range(nrow): |
| frame_matrix[i][j] = frame_vector[count] |
| count += 1 |
| |
| return frame_matrix |
| |
| # Helper function used only by _build_node |
| def _get_one_thresh_matrix(self, thresh_string): |
| """ |
| thresh from the _query_output_table is an array, convert it |
| into a matrix format to simplify the following processing. |
| """ |
| thresh_vector = ([] if thresh_string == '' |
| else [float(x) for x in thresh_string.split(',')]) |
| ncol = 2 |
| nrow = len(thresh_vector) / ncol |
| thresh_matrix = [[0 for j in range(ncol)] for i in range(nrow)] |
| |
| # column wise assignment |
| count = 0 |
| for j in range(ncol): |
| for i in range(nrow): |
| if j == 0: |
| thresh_matrix[i][j] = int(thresh_vector[count]) |
| else: |
| thresh_matrix[i][j] = thresh_vector[count] |
| count += 1 |
| |
| return thresh_matrix |
| |
| # Helper function used only by _build_node |
| # Extract row_id from the frame matrix. See madlib.rpart |
| # for more details. |
| def _frame_row_id(self, frame_matrix): |
| nrow = len(frame_matrix) |
| row_id = [0 for i in range(nrow)] |
| |
| def compute_id(c, r): |
| row_id[c[0]] = r |
| if c[0] >= nrow or frame_matrix[c[0]][0] < 0: |
| c[0] += 1 |
| return |
| c[0] += 1 |
| compute_id(c, 2*r + 1) # left |
| compute_id(c, 2*r + 2) # right |
| |
| count = [0] |
| compute_id(count, 0) |
| return row_id |
| |
| # Helper function used only by _build_node |
| def _frame_row_names(self, frame_matrix): |
| " The name of the variable used for splitting node " |
| nrow = len(frame_matrix) |
| return [None if frame_matrix[i][0] < 0 |
| else self.x[int(round(frame_matrix[i][0]))] |
| for i in range(nrow)] |
| |
| # Helper function used only by _build_node |
| def _get_cat_levels(self, cat_level_vector, cat_n): |
| if cat_n is None: |
| return [] |
| cat_level_array = [] |
| count = 0 |
| for n in cat_n: |
| tmp = [] |
| for i in range(n): |
| tmp.append(cat_level_vector[count]) |
| count += 1 |
| cat_level_array.append(tmp) |
| return cat_level_array |
| |
| |
| class RandomForestPMMLBuilder(DecisionTreePMMLBuilder): |
| """Builder class for PMML model 'ForestModel' |
| """ |
| def __init__(self, schema_madlib, model_type, model_table, name_spec): |
| DecisionTreePMMLBuilder.__init__(self, schema_madlib, model_type, |
| model_table, name_spec) |
| self.mining_schema = None |
| |
| def _parse_summary(self): |
| self.y_str = self.summary['dependent_varname'] |
| # assume that summary table sort the independent varnames (cat, con) |
| self.x_str = self.summary['independent_varnames'] |
| self.x = [s.strip() for s in self.x_str.split(',')] |
| self.n_coef = len(self.x) |
| |
| self.grouping_col = self.summary['grouping_cols'] |
| self.grouping_str = ('' if self.grouping_col is None |
| else self.grouping_col + ',') |
| |
| self.cat_features = [s.strip() |
| for s in self.summary['cat_features'].split(',') |
| if s.strip() != ''] |
| self.con_features = [s.strip() |
| for s in self.summary['con_features'].split(',') |
| if s.strip() != ''] |
| self.n_cats = len(self.cat_features) |
| |
| self.is_classification = self.summary['is_classification'] |
| if self.is_classification: |
| # remove possible quotes around the values |
| self.y_levels = [s.strip().strip("\"") |
| for s in self.summary['dependent_var_levels'].split(',')] |
| |
| self.y_data_type = self.summary['dependent_var_type'] |
| self.y_data_type = self.__class__.DATA_TYPE_DICT[self.y_data_type.lower()] |
| |
| # figure out whether each x is categorical or continuous |
| self.x_optype = ['categorical' if x in self.cat_features else 'continuous' |
| for x in self.x] |
| self.model_table_group = self.model_table + "_group" |
| |
| self.x_data_type = [s.strip() |
| for s in self.summary['independent_var_types'].split(',')] |
| for i, t in enumerate(self.x_data_type): |
| self.x_data_type[i] = self.__class__.DATA_TYPE_DICT[t.lower()] |
| |
| def _query_output_table(self): |
| """ |
| Re-use _convert_to_rpart_format and _get_split_thresholds |
| functions to extract the tree information, and store in memory. |
| """ |
| output_query = """ |
| SELECT |
| {grouping_str} |
| gid, |
| array_to_string( |
| {schema_madlib}._convert_to_rpart_format( |
| tree, |
| {n_cats} |
| ), |
| ',') AS frame, |
| cat_levels_in_text, |
| cat_n_levels, |
| array_to_string( |
| {schema_madlib}._get_split_thresholds( |
| tree, |
| {n_cats} |
| ), |
| ',') AS thresholds |
| FROM |
| {model_table} |
| JOIN |
| {model_table_group} |
| using (gid) |
| """.format(**self.__dict__) |
| self.output = plpy.execute(output_query) |
| |
| def _parse_output(self): |
| self.gids = [g['gid'] for g in self.output] |
| self.grouped_coefs = [{'coef': []} for index in range(max(self.gids))] |
| # grouped_coefs groups trees based on gid |
| # difference from decision trees is that, key 'coef' |
| # points to a list of dictionaries instead of a single dictionary. |
| # each dictionary contains information on one tree of the forest. |
| for g in self.output: |
| gid = g['gid'] |
| dl = self.grouped_coefs[gid-1]['coef'] |
| if dl is None: |
| dl = [] |
| dl.append({'frame': g['frame'], |
| 'cat_levels': g['cat_levels_in_text'], |
| 'cat_n': g['cat_n_levels'], |
| 'thresholds': g['thresholds'], |
| 'gid': g['gid']}) |
| self.grouped_coefs[gid - 1]['coef'] = dl |
| self.grouped_coefs[gid - 1].update( |
| dict((k, v) for k, v in g.items() |
| if k not in ['frame', 'cat_levels_in_text', |
| 'cat_n_levels', 'thresholds', 'gid'])) |
| |
| self.coef0 = self.grouped_coefs[0]['coef'] |
| self.grouping_keys = [k for k in self.output[0] |
| if k not in ['frame', 'cat_levels_in_text', |
| 'cat_n_levels', 'thresholds', 'gid']] |
| |
| def _create_single_model(self, coef0): |
| self._build_mining_schema() # construct MiningSchema |
| |
| # Create segmentation of tree models, combined based on |
| # classification or regression |
| segment_forest = [] |
| for each_coef in coef0: |
| node = self._build_node(each_coef) # construct Node tree |
| single_model = TreeModel(self.mining_schema, node, |
| functionName=self.function, |
| algorithmName='randomForest_model', |
| splitCharacteristic='binarySplit') |
| segment_forest.append(Segment(True_(), single_model)) |
| multipleModelMethod = 'majorityVote' if self.is_classification else 'average' |
| segmentation = Segmentation(*segment_forest, |
| multipleModelMethod=multipleModelMethod) |
| return MiningModel(self.mining_schema, |
| segmentation, |
| functionName=self.function) |