blob: 1cf346311d0b140bc0cd2abf263cda0cbfffdff8 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy
from utilities.utilities import __mad_version
from utilities.utilities import unique_string
from utilities.utilities import num_samples
from utilities.utilities import add_postfix
from validation.cv_utils import __cv_produce_col_name_string as _cv_col_string
from validation.cv_utils import __cv_validation_rows as _cv_validation_rows
from math import sqrt
from collections import namedtuple
from operator import itemgetter
from itertools import product, repeat, chain
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
def _extract_data_cols(y, x):
""" Extract independent data columns from ARRAY[...] and combine it with dependent data column
Parameters
==========
y : string
The dependent data column
x : string
A string of such form: ARRAY[1,?...] where ? indicate 0 or 1 repetitions of the preceding sequence
Returns
=======
columns : a list of strings corresponding to column names.
"""
if not x.startswith('ARRAY'):
return [y] + x.split(',')
import re
m = re.search(r'\[((1,)?(?P<cols>.+))\]', x)
if not m:
plpy.error("SVM error: invalid ({0}) "
"for cross validation!".format(x))
return [y] + m.group('cols').split(',')
class ValidationResult(object):
"""Wrapper class for cross validation results
Parameters
==========
cv_history : list, optional
List of dictionaries.
Each dictionary contains the following three keys:
- mean_score: float, average of scores using sub_args
- std_dev_score: float, standard deviation of scores using sub_args
- sub_args: dict, the values of arguments being validated
"""
def __init__(self, cv_history=None):
if cv_history is None:
cv_history = []
self._cv_history = cv_history
def _get_leafs(self, sub_args):
def _run(sub_args):
a = []
for k, v in sub_args.iteritems():
if isinstance(v, dict):
a.extend(_run(v))
else:
a.append((k, v))
return a
return _run(sub_args)
def _flatten(self):
a = []
for h in self._cv_history:
h = dict(h)
sub_args = h.pop('sub_args')
h.update(dict(self._get_leafs(sub_args)))
a.append(h)
return a
def add_one(self, mean, std, sub_args):
"""Add one record to the history"""
record = dict(mean_score=mean, std_dev_score=std, sub_args=sub_args)
self._cv_history.append(record)
def sorted(self):
"""Sort the history w.r.t. mean value and return a new ValidationResult object"""
ch = sorted(self._cv_history,
reverse=True,
key=itemgetter('mean_score'))
return ValidationResult(ch)
def first(self, attr=None):
"""Return the attribute of the first record of history
Parameters
==========
attr : string, optional
Any string in {'mean_score', 'std_dev_score', 'sub_args'} or None
Returns
=======
record : dict or float.
The return value is either the first record, or the value of the corresponding attr in the first record.
"""
if attr is None:
return self._cv_history[0]
else:
return self._cv_history[0].get(attr)
def top(self, attr=None):
"""Return the first after sort"""
svr = self.sorted()
return svr.first(attr)
# output cv results as a SQL table
def output_tbl(self, tbl_name):
"""Create a table tbl_name that contains the history
The columns of tbl_name are mean_score, std_dev_score and the leaf keys in sub_args.
All column types are assumed to be double precision.
"""
if not tbl_name or not str(tbl_name).strip():
return
header = (self._cv_history[0]['sub_args'].keys() +
['mean_score', 'std_dev_score'])
header_str = ','.join(map(str, header))
data = []
for h in self._flatten():
values = ','.join([str(h[k]) for k in header])
data.append("({0})".format(values))
data = ','.join(data)
plpy.execute("""
CREATE TABLE {tbl_name} ({header_str}) AS
VALUES
{data}
""".
format(tbl_name=tbl_name, header_str=header_str, data=data))
class _ValidationArgs(object):
"""docstring for _ValidationArgs"""
def __init__(self, args):
self._args = args
@classmethod
def grid(cls, sub_args):
def combine_dict(dicts):
# same as dict((k, v) for d in dicts for k, v in d.iteritems())
return dict(chain.from_iterable(d.iteritems() for d in dicts))
def make_dicts(k, vs):
return [dict([t]) for t in zip(repeat(k), vs)]
a = []
for k, v in sub_args.iteritems():
if isinstance(v, list):
a.append(make_dicts(k, v))
elif isinstance(v, dict):
a.append(make_dicts(k, cls.grid(v)))
tuples = product(*a)
return map(combine_dict, tuples)
def make_from(self, **kwargs):
def _update_dict(d1, d2):
if not isinstance(d1, dict):
raise TypeError("{0} is not dict".format(type(d1)))
if not isinstance(d2, dict):
raise TypeError("{0} is not dict".format(type(d2)))
for k, v in d2.iteritems():
if (isinstance(v, dict) and
k in d1 and
isinstance(d1[k], dict)):
_update_dict(d1[k], v)
else:
d1[k] = v
args = dict(self._args)
_update_dict(args, kwargs)
return args
def _cv_copy_data(rel_origin, dependent_varname,
independent_varname, rel_copied, random_id):
"""
"""
target_col, features_col = 'y', 'x'
plpy.execute("drop table if exists {0}".format(rel_copied))
plpy.execute("""
select setseed(0.5);
create temp table {rel_copied} as
select
row_number() over (order by random()) as {random_id},
{dependent_varname} as {target_col},
{independent_varname} as {features_col}
from {rel_origin}
""".format(rel_origin=rel_origin,
rel_copied=rel_copied,
random_id=random_id,
dependent_varname=dependent_varname,
independent_varname=independent_varname,
target_col=target_col, features_col=features_col))
return target_col, features_col
# -------------------------------------------------------------------------
def _cv_split_data(rel_source, col_data, col_id, row_num,
rel_train, rel_valid, n_folds, which_fold):
col_string = _cv_col_string(rel_source, col_data, [col_id])
(start_row, end_row) = _cv_validation_rows(row_num, n_folds, which_fold)
kwargs = dict(rel_train=rel_train, rel_source=rel_source,
col_id=col_id, start_row=start_row,
rel_valid=rel_valid,
end_row=end_row, col_string=col_string)
# Extract the training part of data,
# which corresponds to rows outside of [start_row, end_row).
# Extract the validation part of data,
# which corresponds to rows inside of [start_row, end_row).
plpy.execute("drop view if exists {0}".format(rel_train))
plpy.execute("""
create temp view {rel_train} as
select {col_id}, {col_string} from {rel_source}
where {col_id} < {start_row}
or {col_id} >= {end_row}
""".format(**kwargs))
plpy.execute("drop view if exists {0}".format(rel_valid))
plpy.execute("""
create temp view {rel_valid} as
select {col_id}, {col_string} from {rel_source}
where {col_id} >= {start_row}
and {col_id} < {end_row}
""".format(**kwargs))
return None
# ------------------------------------------------------------------------------
def _cv_del_split_data(rel_train, rel_valid):
plpy.execute("drop view if exists {0} cascade".format(rel_train))
plpy.execute("drop view if exists {0} cascade".format(rel_valid))
# ------------------------------------------------------------------------------
class CrossValidator(object):
"""
Cross validation for estimator
Parameters
==========
estimator : estimator function
The arguments to estimator are contained in args, e.g., estimator(**args)
predictor : predictor function
Arguments:
- schema_madlib: see args for details
- model_table: see args for details
- rel_valid: name of data table to be tested
- col_id: columns containing unique id for each data point
- output_table: table created for the testing results
scorer : string, either 'classification' or 'regression'
Indicate the scoring method to be used.
args : dict (recursive)
Contains all the arguments to run estimator and the data to be used for validation:
Following names are assumed to be available in the args dict:
- source_table: the data table
- independent_varname: the column for features
- dependent_varname: the column for target
- schema_madlib: the schema where madlib is installed
- model_table: table created for the trained model
"""
def __init__(self, estimator, predictor, scorer, args):
self._cv_args = _ValidationArgs(args)
self._estimator = estimator
self._predictor = predictor
self._scorer = scorer
self._target_col = None
self._features_col = None
self._set_data(**args)
def _set_data(self, source_table,
independent_varname,
dependent_varname, **kwargs):
self._col_data = _extract_data_cols(dependent_varname,
independent_varname)
self._col_id = unique_string(desp='col_id')
self._rel_copied = unique_string(desp='rel_copied')
self._row_num = num_samples(source_table)
y, x = _cv_copy_data(source_table, dependent_varname,
independent_varname,
self._rel_copied, self._col_id)
self._target_col, self._features_col = y, x
def _gen_split_data(self, n_folds):
rel_copied = self._rel_copied
col_data = [self._target_col, self._features_col]
col_id = self._col_id
row_num = self._row_num
SplitData = namedtuple('SplitData', 'rel_train, rel_valid')
for k in range(n_folds):
rel_train = unique_string(desp='cv_train_{0}_'.format(k))
rel_valid = unique_string(desp='cv_valid_{0}_'.format(k))
_cv_split_data(rel_copied, col_data, col_id, row_num,
rel_train, rel_valid, n_folds, k+1)
yield SplitData(rel_train=rel_train, rel_valid=rel_valid)
def _test_one_fold(self, split_data, sub_args):
col_id = self._col_id
estimator = self._estimator
predictor = self._predictor
scorer = self._scorer
rel_train = split_data.rel_train
rel_valid = split_data.rel_valid
args = self._cv_args.make_from(source_table=rel_train,
dependent_varname=self._target_col,
independent_varname=self._features_col,
**sub_args)
estimator(**args)
schema_madlib = args['schema_madlib']
output_table = unique_string(desp='output_table')
model_table = args['model_table']
predictor(schema_madlib, model_table, rel_valid, col_id, output_table)
score = self._score(output_table, rel_valid, scorer)
plpy.execute("DROP TABLE IF EXISTS {0}, {1}".
format(model_table, add_postfix(model_table, "_summary")))
plpy.execute("DROP TABLE IF EXISTS {0}".format(output_table))
return score
def _score(self, pred, orig, method):
target = self._target_col
col_id = self._col_id
if method == 'regression':
return plpy.execute(
"""
SELECT
-sqrt(avg(({target}-prediction)^2)) AS accuracy
FROM {pred} JOIN {orig}
ON {pred}.{id} = {orig}.{id}
""".format(pred=pred,
orig=orig,
id=col_id,
target=target))[0]['accuracy']
elif method == 'classification':
return plpy.execute(
"""
SELECT (1 - miss / total) AS accuracy
FROM
(
SELECT count(*)::float8 AS miss
FROM {pred} JOIN {orig}
ON {pred}.{id} = {orig}.{id}
WHERE prediction <> {target}) s,
(
SELECT count(*)::float8 AS total
FROM {orig}) r;
""".format(pred=pred,
orig=orig,
id=col_id,
target=target))[0]['accuracy']
else:
plpy.error("Cross Validation Error: invalid method value ({0})! "
"Need to be either classification "
"or regression!".format(method))
def validate(self, sub_args, n_folds=3):
"""Returns the results of cross validation
Parameters
==========
sub_args : dict (recursive)
Arguments to be validated. Multiple values are provided in a list, e.g.,
sub_args = {
'lambda': [0.1, 1, 10],
'epsilon': [0.1, 1, 10]
}
Before running estimator, sub_args_single is generated from sub_args replacing the lists with single value for each argument and args is updated recursively using sub_args_single.
n_folds : int, default=3
Number of folds. Must be at least 2
Returns
=======
validation_result : object
See class ValidationResult for more details
"""
def _stats(nums):
n = len(nums)
# no need to check against 0 division
# because n_folds is larger than 1
mean = sum(nums) / n
std_dev = sqrt(sum((x - mean)**2 for x in nums) / (n - 1))
return mean, std_dev
if not sub_args:
return []
cv_history = ValidationResult()
all_split_data = list(self._gen_split_data(n_folds))
tof = self._test_one_fold
for sa in _ValidationArgs.grid(sub_args):
scores = [tof(i, sub_args=sa) for i in all_split_data]
a, s = _stats(scores)
cv_history.add_one(mean=a, std=s, sub_args=sa)
for each_split in all_split_data:
_cv_del_split_data(each_split.rel_train, each_split.rel_valid)
return cv_history