| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import plpy |
| from utilities.utilities import __mad_version |
| from utilities.utilities import unique_string |
| from utilities.utilities import num_samples |
| from utilities.utilities import add_postfix |
| from validation.cv_utils import __cv_produce_col_name_string as _cv_col_string |
| from validation.cv_utils import __cv_validation_rows as _cv_validation_rows |
| |
| from math import sqrt |
| from collections import namedtuple |
| from operator import itemgetter |
| from itertools import product, repeat, chain |
| |
| version_wrapper = __mad_version() |
| mad_vec = version_wrapper.select_vecfunc() |
| |
| |
| def _extract_data_cols(y, x): |
| """ Extract independent data columns from ARRAY[...] and combine it with dependent data column |
| |
| Parameters |
| ========== |
| y : string |
| The dependent data column |
| |
| x : string |
| A string of such form: ARRAY[1,?...] where ? indicate 0 or 1 repetitions of the preceding sequence |
| |
| Returns |
| ======= |
| columns : a list of strings corresponding to column names. |
| """ |
| if not x.startswith('ARRAY'): |
| return [y] + x.split(',') |
| import re |
| m = re.search(r'\[((1,)?(?P<cols>.+))\]', x) |
| if not m: |
| plpy.error("SVM error: invalid ({0}) " |
| "for cross validation!".format(x)) |
| return [y] + m.group('cols').split(',') |
| |
| |
| class ValidationResult(object): |
| """Wrapper class for cross validation results |
| |
| Parameters |
| ========== |
| cv_history : list, optional |
| List of dictionaries. |
| Each dictionary contains the following three keys: |
| |
| - mean_score: float, average of scores using sub_args |
| - std_dev_score: float, standard deviation of scores using sub_args |
| - sub_args: dict, the values of arguments being validated |
| """ |
| def __init__(self, cv_history=None): |
| if cv_history is None: |
| cv_history = [] |
| self._cv_history = cv_history |
| |
| def _get_leafs(self, sub_args): |
| def _run(sub_args): |
| a = [] |
| for k, v in sub_args.iteritems(): |
| if isinstance(v, dict): |
| a.extend(_run(v)) |
| else: |
| a.append((k, v)) |
| return a |
| return _run(sub_args) |
| |
| def _flatten(self): |
| a = [] |
| for h in self._cv_history: |
| h = dict(h) |
| sub_args = h.pop('sub_args') |
| h.update(dict(self._get_leafs(sub_args))) |
| a.append(h) |
| return a |
| |
| def add_one(self, mean, std, sub_args): |
| """Add one record to the history""" |
| record = dict(mean_score=mean, std_dev_score=std, sub_args=sub_args) |
| self._cv_history.append(record) |
| |
| def sorted(self): |
| """Sort the history w.r.t. mean value and return a new ValidationResult object""" |
| ch = sorted(self._cv_history, |
| reverse=True, |
| key=itemgetter('mean_score')) |
| return ValidationResult(ch) |
| |
| def first(self, attr=None): |
| """Return the attribute of the first record of history |
| |
| Parameters |
| ========== |
| attr : string, optional |
| Any string in {'mean_score', 'std_dev_score', 'sub_args'} or None |
| |
| Returns |
| ======= |
| record : dict or float. |
| The return value is either the first record, or the value of the corresponding attr in the first record. |
| """ |
| if attr is None: |
| return self._cv_history[0] |
| else: |
| return self._cv_history[0].get(attr) |
| |
| def top(self, attr=None): |
| """Return the first after sort""" |
| svr = self.sorted() |
| return svr.first(attr) |
| |
| # output cv results as a SQL table |
| def output_tbl(self, tbl_name): |
| """Create a table tbl_name that contains the history |
| |
| The columns of tbl_name are mean_score, std_dev_score and the leaf keys in sub_args. |
| All column types are assumed to be double precision. |
| """ |
| if not tbl_name or not str(tbl_name).strip(): |
| return |
| header = (self._cv_history[0]['sub_args'].keys() + |
| ['mean_score', 'std_dev_score']) |
| header_str = ','.join(map(str, header)) |
| |
| data = [] |
| for h in self._flatten(): |
| values = ','.join([str(h[k]) for k in header]) |
| data.append("({0})".format(values)) |
| data = ','.join(data) |
| plpy.execute(""" |
| CREATE TABLE {tbl_name} ({header_str}) AS |
| VALUES |
| {data} |
| """. |
| format(tbl_name=tbl_name, header_str=header_str, data=data)) |
| |
| |
| class _ValidationArgs(object): |
| """docstring for _ValidationArgs""" |
| def __init__(self, args): |
| self._args = args |
| |
| @classmethod |
| def grid(cls, sub_args): |
| def combine_dict(dicts): |
| # same as dict((k, v) for d in dicts for k, v in d.iteritems()) |
| return dict(chain.from_iterable(d.iteritems() for d in dicts)) |
| |
| def make_dicts(k, vs): |
| return [dict([t]) for t in zip(repeat(k), vs)] |
| |
| a = [] |
| for k, v in sub_args.iteritems(): |
| if isinstance(v, list): |
| a.append(make_dicts(k, v)) |
| elif isinstance(v, dict): |
| a.append(make_dicts(k, cls.grid(v))) |
| tuples = product(*a) |
| return map(combine_dict, tuples) |
| |
| def make_from(self, **kwargs): |
| def _update_dict(d1, d2): |
| if not isinstance(d1, dict): |
| raise TypeError("{0} is not dict".format(type(d1))) |
| if not isinstance(d2, dict): |
| raise TypeError("{0} is not dict".format(type(d2))) |
| for k, v in d2.iteritems(): |
| if (isinstance(v, dict) and |
| k in d1 and |
| isinstance(d1[k], dict)): |
| _update_dict(d1[k], v) |
| else: |
| d1[k] = v |
| args = dict(self._args) |
| _update_dict(args, kwargs) |
| return args |
| |
| |
| def _cv_copy_data(rel_origin, dependent_varname, |
| independent_varname, rel_copied, random_id): |
| """ |
| """ |
| target_col, features_col = 'y', 'x' |
| plpy.execute("drop table if exists {0}".format(rel_copied)) |
| plpy.execute(""" |
| select setseed(0.5); |
| create temp table {rel_copied} as |
| select |
| row_number() over (order by random()) as {random_id}, |
| {dependent_varname} as {target_col}, |
| {independent_varname} as {features_col} |
| from {rel_origin} |
| """.format(rel_origin=rel_origin, |
| rel_copied=rel_copied, |
| random_id=random_id, |
| dependent_varname=dependent_varname, |
| independent_varname=independent_varname, |
| target_col=target_col, features_col=features_col)) |
| return target_col, features_col |
| # ------------------------------------------------------------------------- |
| |
| |
| def _cv_split_data(rel_source, col_data, col_id, row_num, |
| rel_train, rel_valid, n_folds, which_fold): |
| |
| col_string = _cv_col_string(rel_source, col_data, [col_id]) |
| |
| (start_row, end_row) = _cv_validation_rows(row_num, n_folds, which_fold) |
| kwargs = dict(rel_train=rel_train, rel_source=rel_source, |
| col_id=col_id, start_row=start_row, |
| rel_valid=rel_valid, |
| end_row=end_row, col_string=col_string) |
| # Extract the training part of data, |
| # which corresponds to rows outside of [start_row, end_row). |
| # Extract the validation part of data, |
| # which corresponds to rows inside of [start_row, end_row). |
| plpy.execute("drop view if exists {0}".format(rel_train)) |
| plpy.execute(""" |
| create temp view {rel_train} as |
| select {col_id}, {col_string} from {rel_source} |
| where {col_id} < {start_row} |
| or {col_id} >= {end_row} |
| """.format(**kwargs)) |
| plpy.execute("drop view if exists {0}".format(rel_valid)) |
| plpy.execute(""" |
| create temp view {rel_valid} as |
| select {col_id}, {col_string} from {rel_source} |
| where {col_id} >= {start_row} |
| and {col_id} < {end_row} |
| """.format(**kwargs)) |
| return None |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _cv_del_split_data(rel_train, rel_valid): |
| plpy.execute("drop view if exists {0} cascade".format(rel_train)) |
| plpy.execute("drop view if exists {0} cascade".format(rel_valid)) |
| # ------------------------------------------------------------------------------ |
| |
| |
| class CrossValidator(object): |
| """ |
| Cross validation for estimator |
| |
| Parameters |
| ========== |
| estimator : estimator function |
| The arguments to estimator are contained in args, e.g., estimator(**args) |
| |
| predictor : predictor function |
| Arguments: |
| |
| - schema_madlib: see args for details |
| - model_table: see args for details |
| - rel_valid: name of data table to be tested |
| - col_id: columns containing unique id for each data point |
| - output_table: table created for the testing results |
| |
| scorer : string, either 'classification' or 'regression' |
| Indicate the scoring method to be used. |
| |
| args : dict (recursive) |
| Contains all the arguments to run estimator and the data to be used for validation: |
| |
| Following names are assumed to be available in the args dict: |
| - source_table: the data table |
| - independent_varname: the column for features |
| - dependent_varname: the column for target |
| - schema_madlib: the schema where madlib is installed |
| - model_table: table created for the trained model |
| |
| """ |
| def __init__(self, estimator, predictor, scorer, args): |
| self._cv_args = _ValidationArgs(args) |
| self._estimator = estimator |
| self._predictor = predictor |
| self._scorer = scorer |
| self._target_col = None |
| self._features_col = None |
| self._set_data(**args) |
| |
| def _set_data(self, source_table, |
| independent_varname, |
| dependent_varname, **kwargs): |
| self._col_data = _extract_data_cols(dependent_varname, |
| independent_varname) |
| self._col_id = unique_string(desp='col_id') |
| self._rel_copied = unique_string(desp='rel_copied') |
| self._row_num = num_samples(source_table) |
| y, x = _cv_copy_data(source_table, dependent_varname, |
| independent_varname, |
| self._rel_copied, self._col_id) |
| self._target_col, self._features_col = y, x |
| |
| def _gen_split_data(self, n_folds): |
| rel_copied = self._rel_copied |
| col_data = [self._target_col, self._features_col] |
| col_id = self._col_id |
| row_num = self._row_num |
| SplitData = namedtuple('SplitData', 'rel_train, rel_valid') |
| for k in range(n_folds): |
| rel_train = unique_string(desp='cv_train_{0}_'.format(k)) |
| rel_valid = unique_string(desp='cv_valid_{0}_'.format(k)) |
| _cv_split_data(rel_copied, col_data, col_id, row_num, |
| rel_train, rel_valid, n_folds, k+1) |
| yield SplitData(rel_train=rel_train, rel_valid=rel_valid) |
| |
| def _test_one_fold(self, split_data, sub_args): |
| col_id = self._col_id |
| estimator = self._estimator |
| predictor = self._predictor |
| scorer = self._scorer |
| rel_train = split_data.rel_train |
| rel_valid = split_data.rel_valid |
| args = self._cv_args.make_from(source_table=rel_train, |
| dependent_varname=self._target_col, |
| independent_varname=self._features_col, |
| **sub_args) |
| estimator(**args) |
| |
| schema_madlib = args['schema_madlib'] |
| output_table = unique_string(desp='output_table') |
| model_table = args['model_table'] |
| |
| predictor(schema_madlib, model_table, rel_valid, col_id, output_table) |
| |
| score = self._score(output_table, rel_valid, scorer) |
| plpy.execute("DROP TABLE IF EXISTS {0}, {1}". |
| format(model_table, add_postfix(model_table, "_summary"))) |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(output_table)) |
| return score |
| |
| def _score(self, pred, orig, method): |
| target = self._target_col |
| col_id = self._col_id |
| if method == 'regression': |
| return plpy.execute( |
| """ |
| SELECT |
| -sqrt(avg(({target}-prediction)^2)) AS accuracy |
| FROM {pred} JOIN {orig} |
| ON {pred}.{id} = {orig}.{id} |
| """.format(pred=pred, |
| orig=orig, |
| id=col_id, |
| target=target))[0]['accuracy'] |
| elif method == 'classification': |
| return plpy.execute( |
| """ |
| SELECT (1 - miss / total) AS accuracy |
| FROM |
| ( |
| SELECT count(*)::float8 AS miss |
| FROM {pred} JOIN {orig} |
| ON {pred}.{id} = {orig}.{id} |
| WHERE prediction <> {target}) s, |
| ( |
| SELECT count(*)::float8 AS total |
| FROM {orig}) r; |
| """.format(pred=pred, |
| orig=orig, |
| id=col_id, |
| target=target))[0]['accuracy'] |
| else: |
| plpy.error("Cross Validation Error: invalid method value ({0})! " |
| "Need to be either classification " |
| "or regression!".format(method)) |
| |
| def validate(self, sub_args, n_folds=3): |
| """Returns the results of cross validation |
| |
| Parameters |
| ========== |
| sub_args : dict (recursive) |
| Arguments to be validated. Multiple values are provided in a list, e.g., |
| |
| sub_args = { |
| 'lambda': [0.1, 1, 10], |
| 'epsilon': [0.1, 1, 10] |
| } |
| |
| Before running estimator, sub_args_single is generated from sub_args replacing the lists with single value for each argument and args is updated recursively using sub_args_single. |
| |
| n_folds : int, default=3 |
| Number of folds. Must be at least 2 |
| |
| Returns |
| ======= |
| validation_result : object |
| See class ValidationResult for more details |
| """ |
| def _stats(nums): |
| n = len(nums) |
| # no need to check against 0 division |
| # because n_folds is larger than 1 |
| mean = sum(nums) / n |
| std_dev = sqrt(sum((x - mean)**2 for x in nums) / (n - 1)) |
| return mean, std_dev |
| |
| if not sub_args: |
| return [] |
| cv_history = ValidationResult() |
| all_split_data = list(self._gen_split_data(n_folds)) |
| tof = self._test_one_fold |
| for sa in _ValidationArgs.grid(sub_args): |
| scores = [tof(i, sub_args=sa) for i in all_split_data] |
| a, s = _stats(scores) |
| cv_history.add_one(mean=a, std=s, sub_args=sa) |
| for each_split in all_split_data: |
| _cv_del_split_data(each_split.rel_train, each_split.rel_valid) |
| return cv_history |