blob: 879c0d6ad5afb351ae40e481a517f7b38d825ca7 [file] [log] [blame]
import plpy
from utilities.utilities import unique_string, _cast_if_null
from validation.cv_utils import __cv_copy_data_with_id_compute
from validation.cv_utils import __cv_split_data_using_id_col_compute
from validation.cv_utils import __cv_split_data_using_id_tbl_compute
from validation.cv_utils import __cv_generate_random_id
from utilities.utilities import __mad_version
from utilities.utilities import split_quoted_delimited_str
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
# ========================================================================
def __utils_ind_var_scales(tbl_data, col_ind_var, dimension, schema_madlib):
"""
The mean and standard deviation for each dimension of an array stored
in a column.
Returns:
Dictionary with keys 'mean' and 'std' each with a value of an array of
the mean and std. dev values respectively.
"""
x_scales = plpy.execute(
"""
SELECT (f).*
FROM (
SELECT
{schema_madlib}.__utils_var_scales_result(
{schema_madlib}.utils_var_scales({col_ind_var}, {dimension})) as f
FROM
{tbl_data}
) q2
""".format(**locals()))[0]
x_scales["mean"] = mad_vec(x_scales["mean"], text=False)
x_scales["std"] = mad_vec(x_scales["std"], text=False)
return x_scales
# ========================================================================
def __utils_ind_var_scales_grouping(tbl_data, col_ind_var, dimension,
schema_madlib, grouping_col, x_mean_table):
"""
The mean and standard deviation for each dimension of an array stored in
a column. Creates a table containing the mean (array) and std of each
dimension of the independent variable, for each group.
"""
group_col = _cast_if_null(grouping_col, unique_string('grp_col'))
x_scales = plpy.execute(
"""
CREATE TEMP TABLE {x_mean_table} AS
SELECT (f).*, {group_col}
FROM (
SELECT {group_col},
{schema_madlib}.__utils_var_scales_result(
{schema_madlib}.utils_var_scales({col_ind_var}, {dimension})) as f
FROM
{tbl_data}
GROUP BY {group_col}
) q2
""".format(**locals()))
# ========================================================================
def __utils_dep_var_scale(schema_madlib, tbl_data, col_ind_var,
col_dep_var):
"""
The mean and standard deviation for each element of the dependent variable,
which is a scalar in ridge and lasso.
This function is also used in lasso.
Parameters:
schema_madlib -- madlib schema
tbl_data -- original data
col_ind_var -- independent variables column
col_dep_var -- dependent variable column
"""
y_scale = plpy.execute(
"""
SELECT
avg(CASE WHEN NOT {schema_madlib}.array_contains_null({col_ind_var}) THEN {col_dep_var} END) AS mean,
1 AS std
FROM {tbl_data}
""".format(**locals()))[0]
return y_scale
# ========================================================================
def __utils_dep_var_scale_grouping(y_mean_table, tbl_data, grouping_col,
family, schema_madlib=None, col_ind_var=None, col_dep_var=None):
"""
The mean and standard deviation for each element of the dependent variable,
w.r.t a group, which is a scalar in ridge and lasso.
The output will be stored in a temp table: a mean array and a std array,
for each group.
If the family is Binomial, mean and std for each group is set to 0 and 1
respectively.
This function is also used in lasso.
Parameters:
y_mean_table -- name of the output table to write into
tbl_data -- input table
grouping_col -- the columns to group the data on
family -- if family is Gaussian, ALL following parameters must be defined
schema_madlib -- madlib schema
col_ind_var -- independent variables column
col_dep_var -- dependent variable column
"""
group_col = _cast_if_null(grouping_col, unique_string('grp_col'))
if family == 'binomial':
mean_str = '0'
else:
# If the family is Gaussian, schema_madlib, col_ind_var and
# col_dep_var must be passed along.
if schema_madlib is None or col_ind_var is None or col_dep_var is None:
plpy.error("Schema name, indpendent column and dependent column names required.")
mean_str = ' avg(CASE WHEN NOT {0}.array_contains_null({1}) THEN {2} END) '.format(
schema_madlib, col_ind_var, col_dep_var)
plpy.execute(
"""
CREATE TEMP TABLE {y_mean_table} AS
SELECT {group_col},
{mean_str} AS mean,
1 AS std
FROM {tbl_data}
GROUP BY {group_col}
""".format(**locals()))
# ========================================================================
def __utils_normalize_data_grouping(y_decenter=True, **kwargs):
"""
Normalize the independent and dependent variables using the calculated
mean's and std's in __utils_ind_var_scales and __utils_dep_var_scale.
Compute the scaled variables by: scaled_value = (origin_value - mean) / std,
and special care is needed if std is zero.
The output is a table with scaled independent and dependent variables,
based on mean and std for each group. This function is also used in lasso.
Parameters:
tbl_data -- original data
col_ind_var -- independent variables column
dimension -- length of independent variable array
col_dep_var -- dependent variable column
tbl_ind_scales -- independent variables scales array
tbl_dep_scale -- dependent variable scale
tbl_data_scaled -- scaled data result
col_ind_var_norm_new -- create a new name for the scaled array
to be compatible with array[...] expressions
x_mean_table -- name of the table containing mean of 'x' for each group
y_mean_table -- name of the table containing mean of 'y' for each group
grouping_col -- columns to group the data on
"""
group_col = kwargs.get('grouping_col')
group_col_list = split_quoted_delimited_str(group_col)
group_where_x = ' AND '.join(['{tbl_data}.{grp}=__x__.{grp}'.format(grp=grp,
**kwargs) for grp in group_col_list])
group_where_y = ' AND '.join(['{tbl_data}.{grp}=__y__.{grp}'.format(grp=grp,
**kwargs) for grp in group_col_list])
ydecenter_str = "- __y__.mean".format(**kwargs) if y_decenter else ""
plpy.execute(
"""
CREATE TEMP TABLE {tbl_data_scaled} AS
SELECT
({schema_madlib}.utils_normalize_data({col_ind_var},
__x__.mean::double precision[],
__x__.std::double precision[]))
AS {col_ind_var_norm_new},
({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new},
{tbl_data}.{group_col}
FROM {tbl_data}
INNER JOIN {x_mean_table} AS __x__ ON {group_where_x}
INNER JOIN {y_mean_table} AS __y__ ON {group_where_y}
""".format(ydecenter_str=ydecenter_str, group_col=group_col,
group_where_x=group_where_x, group_where_y=group_where_y, **kwargs))
return None
# ========================================================================
def __utils_normalize_data(y_decenter=True, **kwargs):
"""
Normalize the independent and dependent variables using the calculated mean's and std's
in __utils_ind_var_scales and __utils_dep_var_scale.
Compute the scaled variables by: scaled_value = (origin_value - mean) / std, and special
care is needed if std is zero.
The output is a table with scaled independent and dependent variables.
This function is also used in lasso.
Parameters:
tbl_data -- original data
col_ind_var -- independent variables column
dimension -- length of independent variable array
col_dep_var -- dependent variable column
tbl_ind_scales -- independent variables scales array
tbl_dep_scale -- dependent variable scale
tbl_data_scaled -- scaled data result
col_ind_var_norm_new -- create a new name for the scaled array
to be compatible with array[...] expressions
"""
ydecenter_str = "- {y_mean}".format(**kwargs) if y_decenter else ""
plpy.execute(
"""
CREATE TEMP TABLE {tbl_data_scaled} AS
SELECT
({schema_madlib}.utils_normalize_data({col_ind_var},
'{x_mean_str}'::double precision[],
'{x_std_str}'::double precision[]))
AS {col_ind_var_norm_new},
({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new}
FROM {tbl_data}
""".format(ydecenter_str=ydecenter_str, **kwargs))
return None
# ========================================================================
def __utils_cv_preprocess(kwargs):
"""
Some common processes used in both ridge and lasso cross validation functions:
copy data if needed,
update name space,
check the validity of fold_num,
create table to store all the fitting coefficients
"""
# table containing the data, which will be
# split into training and validation parts
data_tbl = kwargs["data_tbl"]
# whether the user provides a unique ID for each row
# if not, then it is None
data_id = kwargs["data_id"]
# whether the ID provided by user is random
id_is_random = kwargs["id_is_random"]
col_ind_var = kwargs["col_ind_var"]
col_dep_var = kwargs["col_dep_var"]
# new name for dependent column due to data copy & split
col_dep_var_cv_new = unique_string()
# new name for dependent column due to normalization
col_dep_var_norm_new = unique_string()
# how many fold validation, default: 10
fold_num = kwargs["fold_num"]
# how many fold actually will be used, default: 10.
# If 1, it is just one validation.
upto_fold = kwargs["upto_fold"]
# if need to copy the data,
# this is the copied table name
tbl_all_data = unique_string()
# table name before normalization
tbl_inter = unique_string()
# table name for training
tbl_train = unique_string()
# table name for validation
tbl_valid = unique_string()
# column name for random id
col_random_id = unique_string()
# table for random ID mapping, used when
# data_id is not None and id_is_random is False
tbl_random_id = unique_string()
# accumulate the error information
tbl_accum_error = unique_string()
# independent variable (array) scales
# inclduing mean's and std's
tbl_ind_scales = unique_string()
# dependent variable scale including mean and std
tbl_dep_scale = unique_string()
# table to store fitting coefficients from
# all validations for all parameter values
tbl_coef = unique_string()
# new name for independent column due to data copy & split
col_ind_var_cv_new = unique_string()
# new name for independent column due to normalization
col_ind_var_norm_new = unique_string()
kwargs.update(dict(tbl_accum_error = tbl_accum_error,
tbl_all_data = tbl_all_data,
tbl_inter = tbl_inter,
tbl_train = tbl_train,
tbl_valid = tbl_valid,
tbl_random_id = tbl_random_id,
col_random_id = col_random_id,
tbl_ind_scales = tbl_ind_scales,
tbl_dep_scale = tbl_dep_scale,
col_ind_var_cv_new = col_ind_var_cv_new,
col_ind_var_norm_new = col_ind_var_norm_new,
col_dep_var_cv_new = col_dep_var_cv_new,
col_dep_var_norm_new = col_dep_var_norm_new,
tbl_coef = tbl_coef))
# data_cols = [col_ind_var, col_dep_var]
if data_id is None:
# unique ID column is not given, has to copy the data and create the ID
# CV function cannot be used here, because of possible array expressions
__utils_cv_copy_data_with_id(data_tbl, col_dep_var, col_ind_var,
col_dep_var_cv_new, col_ind_var_cv_new, tbl_all_data, col_random_id)
tbl_used = tbl_all_data
kwargs["col_ind_var_cp_new"] = col_ind_var_cv_new
kwargs["col_dep_var_cp_new"] = col_dep_var_cv_new
elif id_is_random:
# unique ID column is given and is random
tbl_used = data_tbl
col_random_id = data_id
kwargs["col_random_id"] = data_id
kwargs["col_ind_var_cp_new"] = col_ind_var
kwargs["col_dep_var_cp_new"] = col_dep_var
else:
# the provided unique ID is not random, create
# a table mapping the given ID to a random ID
__cv_generate_random_id(data_tbl, data_id, tbl_random_id, col_random_id)
tbl_used = data_tbl
kwargs["col_ind_var_cp_new"] = col_ind_var
kwargs["col_dep_var_cp_new"] = col_dep_var
# original data row number
row_num = plpy.execute("select count(*) as row_num from {data_tbl}".format(**kwargs))[0]["row_num"]
# length of the independent variable array
dimension = plpy.execute("select max(array_upper({col_ind_var},1)) as dimension from {data_tbl}".format(**kwargs))[0]["dimension"]
kwargs.update(dict(tbl_used = tbl_used, row_num = row_num, dimension = dimension))
# table to append all fitting results
# which are distinguished by id
plpy.execute("""
drop table if exists {tbl_coef};
create temp table {tbl_coef} (id integer, coef double precision[], intercept double precision)
""".format(**kwargs))
return None
## ========================================================================
def __utils_accumulate_error(accum_count, tbl_accum_error, param_value, error):
"""
Function needed by both ridge and lasso cross validation functions:
accumulate measured errors from each validation for each lambda value.
"""
if accum_count == 1:
plpy.execute("create temp table {tbl_accum_error} (lambda double precision, mean_squared_error double precision)".format(tbl_accum_error = tbl_accum_error))
plpy.execute("insert into {tbl_accum_error} values ({param_value}, {error})".format(
tbl_accum_error = tbl_accum_error,
param_value = param_value, error = error))
## ========================================================================
def __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new):
"""
Given an array of strings, pick out the column names and form a single
string, so that we could select only the necessary data when copying is
inevitable.
@param tbl Name of the table that contains the columns
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
variable, so that we can refer to the possible array expression
"""
return col_dep_var + " as " + col_dep_var_cv_new + ", "
+ col_ind_var + " as " + col_ind_var_cv_new
# ========================================================================
def __utils_cv_copy_data_with_id (rel_origin, col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new, rel_copied,
random_id):
"""
If the user does not provide a ID column, the data table has to be
copied and at the same time create a random ID column with it.
@param rel_origin Original table name, a string
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
variable, so that we can refer to the possible array expression
@param rel_copied Name of the table that copies the data from the original table
@param random_id Column name for random unqiue ID in the newly created copied table
"""
# We want to select only the columns that will be used in the computation.
col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new)
__cv_copy_data_with_id_compute (rel_origin, col_string, rel_copied, random_id)
return None
# ========================================================================
def __utils_cv_split_data_using_id_col (rel_source, col_dep_var, col_dep_var_cv_new,
col_ind_var,
col_ind_var_cv_new, col_id, row_num,
rel_train, rel_valid, fold_num, which_fold):
"""
A random ID column exists (originally exists or was created during copying),
split the data into training and validation.
@param rel_source Name of data source table
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
@param col_id Name of the unique ID column
@param row_num Total number of rows
@param rel_train Name of training data table
@param rel_valid Name of validation data table
@param fold_num How many fold cross validation
@param which_fold Which fold will be used as validation part?
"""
col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, col_ind_var, col_ind_var_cv_new)
__cv_split_data_using_id_col_compute (rel_source, col_string, col_id, row_num,
rel_train, rel_valid, fold_num, which_fold)
return None
# ========================================================================
def __utils_cv_split_data_using_id_tbl (rel_origin, col_dep_var, col_dep_var_cv_new,
col_ind_var,
col_ind_var_cv_new, rel_random_id,
random_id, origin_id, row_num, rel_train,
rel_valid, fold_num, which_fold):
"""
Split the data table using a random ID mapping table
A unique ID column exists in the original table, but it is not randomly assigned.
Thus a table that maps this non-random ID to a real random ID has been created by
__cv_generate_random_id.
@param rel_origin Name of the original data table
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
@param rel_random_id Name of the random ID mapping table
@param random_id Name of random ID column in the table rel_random_id
@param origin_id Name of the original non-random ID column in rel_origin and rel_random_id
@param row_num Total number of rows
@param rel_train Name of training data table
@param rel_valid Name of validation data table
@param fold_num How many fold cross validation
@param which_fold Which fold will be used as validation part?
"""
col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new)
__cv_split_data_using_id_tbl_compute (rel_origin, col_string, rel_random_id,
random_id, origin_id, row_num, rel_train,
rel_valid, fold_num, which_fold)
return None