blob: 2b9a2793bdb5bb520506c1a5e28b24989edcc871 [file] [log] [blame]
import plpy
from utilities.utilities import unique_string, _cast_if_null
from validation.cv_utils import __cv_copy_data_with_id_compute
from validation.cv_utils import __cv_split_data_using_id_col_compute
from validation.cv_utils import __cv_split_data_using_id_tbl_compute
from validation.cv_utils import __cv_generate_random_id
from utilities.utilities import __mad_version
from utilities.utilities import _check_groups
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import split_quoted_delimited_str
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
# ========================================================================
def utils_ind_var_scales(tbl_data, col_ind_var, dimension, schema_madlib,
x_mean_table=None, set_zero_std_to_one=False):
"""
The mean and standard deviation for each dimension of an array stored
in a column.
@param:
tbl_data,
col_ind_var,
dimension,
schema_madlib,
x_mean_table, (optional, default is None. If set to a valid string
the output will be dumped to a temp table)
set_zero_std_to_one (optional, default is False. If set to true
0.0 standard deviation values will be set to 1.0)
Returns:
Dictionary with keys 'mean' and 'std' each with a value of an array of
the mean and std. dev values respectively.
"""
create_table = ""
if x_mean_table:
create_table = "CREATE TEMP TABLE {0} AS ".format(x_mean_table)
# If we want to set features that have standard deviation value of 0
# to 1, we must call a different UDA. We must set standard deviation
# of 0 to 1 for algorithms such as neural networks. But for algorithms
# such as elastic_net, we don't have to do that since the corresponding
# coefficient itself is set to 0 (thus ignoring that particular feature
# altogether).
if set_zero_std_to_one:
scaling_uda_name = 'utils_var_scales_non_zero_std'
else:
scaling_uda_name = 'utils_var_scales'
query = """
{create_table}
SELECT (f).*
FROM (
SELECT
{schema_madlib}.__utils_var_scales_result(
{schema_madlib}.{scaling_uda_name}({col_ind_var}, {dimension})) as f
FROM
{tbl_data}
) q2"""
x_scales = plpy.execute(query.format(**locals()))
if x_mean_table:
x_scales = plpy.execute("""
SELECT * FROM {x_mean_table}
""".format(**locals()))
x_scaled_vals = dict()
x_scaled_vals["mean"] = mad_vec(x_scales[0]["mean"], text=False)
x_scaled_vals["std"] = mad_vec(x_scales[0]["std"], text=False)
return x_scaled_vals
# ========================================================================
def utils_ind_var_scales_grouping(tbl_data, col_ind_var, dimension,
schema_madlib, grouping_col, x_mean_table,
set_zero_std_to_one=False,
create_temp_table=True):
"""
The mean and standard deviation for each dimension of an array stored in
a column. Creates a table containing the mean (array) and std of each
dimension of the independent variable, for each group.
@param:
tbl_data,
col_ind_var,
dimension,
schema_madlib,
grouping_col,
x_mean_table,
set_zero_std_to_one: (optional, default is False. If set to true
0.0 standard deviation values will be set to 1.0)
create_temp_table: If set to false, create a persistent instead of a
temp table, else create a temp table for x_mean
Returns:
Dictionary with keys 'mean' and 'std' each with a value of an array of
the mean and std. dev values respectively, for each group.
"""
# If we want to set features that have standard deviation value of 0
# to 1, we must call a different UDA. We must set standard deviation
# of 0 to 1 for algorithms such as neural networks. But for algorithms
# such as elastic_net, we don't have to do that since the corresponding
# coefficient itself is set to 0 (thus ignoring that particular feature
# altogether).
if set_zero_std_to_one:
scaling_uda_name = 'utils_var_scales_non_zero_std'
else:
scaling_uda_name = 'utils_var_scales'
group_col = _cast_if_null(grouping_col, unique_string('grp_col'))
create_table_command = "CREATE TEMP TABLE" if create_temp_table else \
"CREATE TABLE"
x_scales = plpy.execute(
"""
{create_table_command} {x_mean_table} AS
SELECT {group_col}, (f).*
FROM (
SELECT {group_col},
{schema_madlib}.__utils_var_scales_result(
{schema_madlib}.{scaling_uda_name}({col_ind_var}, {dimension})) as f
FROM
{tbl_data}
GROUP BY {group_col}
) q2
""".format(**locals()))
# ========================================================================
def __utils_dep_var_scale(schema_madlib, tbl_data, col_ind_var,
col_dep_var):
"""
The mean and standard deviation for each element of the dependent variable,
which is a scalar in ridge and lasso.
This function is also used in lasso.
Parameters:
schema_madlib -- madlib schema
tbl_data -- original data
col_ind_var -- independent variables column
col_dep_var -- dependent variable column
"""
y_scale = plpy.execute(
"""
SELECT
avg(CASE WHEN NOT {schema_madlib}.array_contains_null({col_ind_var})
THEN {col_dep_var} END) AS mean,
1 AS std
FROM {tbl_data}
""".format(**locals()))[0]
return y_scale
# ========================================================================
def __utils_dep_var_scale_grouping(y_mean_table, tbl_data, grouping_col,
family, schema_madlib=None,
col_ind_var=None, col_dep_var=None):
"""
The mean and standard deviation for each element of the dependent variable,
w.r.t a group, which is a scalar in ridge and lasso.
The output will be stored in a temp table: a mean array and a std array,
for each group.
If the family is Binomial, mean and std for each group is set to 0 and 1
respectively.
This function is also used in lasso.
Parameters:
y_mean_table -- name of the output table to write into
tbl_data -- input table
grouping_col -- the columns to group the data on
family -- if family is Gaussian, ALL following parameters must be defined
schema_madlib -- madlib schema
col_ind_var -- independent variables column
col_dep_var -- dependent variable column
"""
group_col = _cast_if_null(grouping_col, unique_string('grp_col'))
if family == 'binomial':
mean_str = '0'
else:
# If the family is Gaussian, schema_madlib, col_ind_var and
# col_dep_var must be passed along.
if schema_madlib is None or col_ind_var is None or col_dep_var is None:
plpy.error("Schema name, indpendent column and dependent column names required.")
mean_str = (' avg(CASE WHEN NOT {0}.array_contains_null({1}) THEN {2} END) '.
format(schema_madlib, col_ind_var, col_dep_var))
plpy.execute(
"""
CREATE TEMP TABLE {y_mean_table} AS
SELECT {group_col},
{mean_str} AS mean,
1 AS std
FROM {tbl_data}
GROUP BY {group_col}
""".format(**locals()))
# ========================================================================
def __utils_normalize_data_grouping(y_decenter=True, **kwargs):
"""
Normalize the independent and dependent variables using the calculated
mean's and std's in utils_ind_var_scales and __utils_dep_var_scale.
Compute the scaled variables by: scaled_value = (origin_value - mean) / std,
and special care is needed if std is zero.
The output is a table with scaled independent and dependent variables,
based on mean and std for each group. This function is also used in lasso.
Parameters:
tbl_data -- original data
col_ind_var -- independent variables column
dimension -- length of independent variable array
col_dep_var -- dependent variable column
tbl_ind_scales -- independent variables scales array
tbl_dep_scale -- dependent variable scale
tbl_data_scaled -- scaled data result
col_ind_var_norm_new -- create a new name for the scaled array
to be compatible with array[...] expressions
x_mean_table -- name of the table containing mean of 'x' for each group
y_mean_table -- name of the table containing mean of 'y' for each group
grouping_col -- columns to group the data on
"""
group_col = kwargs.get('grouping_col')
group_col_list = split_quoted_delimited_str(group_col)
# If more than one column was specified for grouping, prefix each column
# name with '{tbl_data}.' to avoid ambiguity.
select_grouping_cols = get_table_qualified_col_str(kwargs.get('tbl_data'),
group_col_list)
x_mean_join_clause = ''
y_mean_join_clause = ''
if kwargs.get('x_mean_table'):
group_where_x = _check_groups(kwargs.get('tbl_data'), '__x__',
group_col_list)
x_mean_join_clause = "INNER JOIN {0} AS __x__ ON {1}".format(
kwargs.get('x_mean_table'), group_where_x)
if kwargs.get('y_mean_table'):
group_where_y = _check_groups(kwargs.get('tbl_data'), '__y__',
group_col_list)
y_mean_join_clause = "INNER JOIN {0} AS __y__ ON {1}".format(
kwargs.get('y_mean_table'), group_where_y)
ydecenter_str = "- __y__.mean".format(**kwargs) if y_decenter else ""
weights_str = ", {weights}".format(**kwargs) if 'weights' in kwargs else ""
plpy.execute("""
CREATE TEMP TABLE {tbl_data_scaled}
m4_ifdef(`__POSTGRESQL__', `', `WITH (appendonly=true)')
AS
SELECT
({schema_madlib}.utils_normalize_data({col_ind_var},
__x__.mean::double precision[],
__x__.std::double precision[]))
AS {col_ind_var_norm_new},
({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new},
{select_grouping_cols}
{weights_str}
FROM {tbl_data}
{x_mean_join_clause}
{y_mean_join_clause}
""".format(ydecenter_str=ydecenter_str, group_col=group_col,
x_mean_join_clause=x_mean_join_clause,
y_mean_join_clause=y_mean_join_clause,
select_grouping_cols=select_grouping_cols,
weights_str=weights_str,
**kwargs))
return None
# ========================================================================
def __utils_normalize_data(y_decenter=True, **kwargs):
"""
Normalize the independent and dependent variables using the calculated mean's and std's
in utils_ind_var_scales and __utils_dep_var_scale.
Compute the scaled variables by: scaled_value = (origin_value - mean) / std, and special
care is needed if std is zero.
The output is a table with scaled independent and dependent variables.
This function is also used in lasso.
Parameters:
tbl_data -- original data
col_ind_var -- independent variables column
dimension -- length of independent variable array
col_dep_var -- dependent variable column
tbl_ind_scales -- independent variables scales array
tbl_dep_scale -- dependent variable scale
tbl_data_scaled -- scaled data result
col_ind_var_norm_new -- create a new name for the scaled array
to be compatible with array[...] expressions
"""
ydecenter_str = "- {y_mean}".format(**kwargs) if y_decenter else ""
weights_str = ", {weights}".format(**kwargs) if 'weights' in kwargs else ""
plpy.execute(
"""
CREATE TEMP TABLE {tbl_data_scaled}
m4_ifdef(`__POSTGRESQL__', `', `WITH (appendonly=true)')
AS
SELECT
({schema_madlib}.utils_normalize_data(
{col_ind_var},
'{x_mean_str}'::double precision[],
'{x_std_str}'::double precision[]))
AS {col_ind_var_norm_new},
({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new}
{weights_str}
FROM {tbl_data}
""".format(ydecenter_str=ydecenter_str,
weights_str=weights_str,
**kwargs))
return None
# ========================================================================
def __utils_cv_preprocess(kwargs):
"""
Some common processes used in both ridge and lasso cross validation functions:
copy data if needed,
update name space,
check the validity of fold_num,
create table to store all the fitting coefficients
"""
# table containing the data, which will be
# split into training and validation parts
data_tbl = kwargs["data_tbl"]
# whether the user provides a unique ID for each row
# if not, then it is None
data_id = kwargs["data_id"]
# whether the ID provided by user is random
id_is_random = kwargs["id_is_random"]
col_ind_var = kwargs["col_ind_var"]
col_dep_var = kwargs["col_dep_var"]
# new name for dependent column due to data copy & split
col_dep_var_cv_new = unique_string()
# new name for dependent column due to normalization
col_dep_var_norm_new = unique_string()
# how many fold validation, default: 10
fold_num = kwargs["fold_num"]
# how many fold actually will be used, default: 10.
# If 1, it is just one validation.
upto_fold = kwargs["upto_fold"]
# if need to copy the data,
# this is the copied table name
tbl_all_data = unique_string()
# table name before normalization
tbl_inter = unique_string()
# table name for training
tbl_train = unique_string()
# table name for validation
tbl_valid = unique_string()
# column name for random id
col_random_id = unique_string()
# table for random ID mapping, used when
# data_id is not None and id_is_random is False
tbl_random_id = unique_string()
# accumulate the error information
tbl_accum_error = unique_string()
# independent variable (array) scales
# inclduing mean's and std's
tbl_ind_scales = unique_string()
# dependent variable scale including mean and std
tbl_dep_scale = unique_string()
# table to store fitting coefficients from
# all validations for all parameter values
tbl_coef = unique_string()
# new name for independent column due to data copy & split
col_ind_var_cv_new = unique_string()
# new name for independent column due to normalization
col_ind_var_norm_new = unique_string()
kwargs.update(dict(tbl_accum_error=tbl_accum_error,
tbl_all_data=tbl_all_data,
tbl_inter=tbl_inter,
tbl_train=tbl_train,
tbl_valid=tbl_valid,
tbl_random_id=tbl_random_id,
col_random_id=col_random_id,
tbl_ind_scales=tbl_ind_scales,
tbl_dep_scale=tbl_dep_scale,
col_ind_var_cv_new=col_ind_var_cv_new,
col_ind_var_norm_new=col_ind_var_norm_new,
col_dep_var_cv_new=col_dep_var_cv_new,
col_dep_var_norm_new=col_dep_var_norm_new,
tbl_coef=tbl_coef))
# data_cols = [col_ind_var, col_dep_var]
if data_id is None:
# unique ID column is not given, has to copy the data and create the ID
# CV function cannot be used here, because of possible array expressions
__utils_cv_copy_data_with_id(data_tbl, col_dep_var, col_ind_var,
col_dep_var_cv_new, col_ind_var_cv_new, tbl_all_data, col_random_id)
tbl_used = tbl_all_data
kwargs["col_ind_var_cp_new"] = col_ind_var_cv_new
kwargs["col_dep_var_cp_new"] = col_dep_var_cv_new
elif id_is_random:
# unique ID column is given and is random
tbl_used = data_tbl
col_random_id = data_id
kwargs["col_random_id"] = data_id
kwargs["col_ind_var_cp_new"] = col_ind_var
kwargs["col_dep_var_cp_new"] = col_dep_var
else:
# the provided unique ID is not random, create
# a table mapping the given ID to a random ID
__cv_generate_random_id(data_tbl, data_id, tbl_random_id, col_random_id)
tbl_used = data_tbl
kwargs["col_ind_var_cp_new"] = col_ind_var
kwargs["col_dep_var_cp_new"] = col_dep_var
# original data row number
row_num = plpy.execute("select count(*) as row_num from {data_tbl}".format(**kwargs))[0]["row_num"]
# length of the independent variable array
dimension = plpy.execute("select max(array_upper({col_ind_var},1)) as dimension from {data_tbl}".format(**kwargs))[0]["dimension"]
kwargs.update(dict(tbl_use=tbl_used, row_num=row_num, dimension=dimension))
# table to append all fitting results
# which are distinguished by id
plpy.execute("""
drop table if exists {tbl_coef};
create temp table {tbl_coef} (id integer, coef double precision[], intercept double precision)
""".format(**kwargs))
return None
# ========================================================================
def __utils_accumulate_error(accum_count, tbl_accum_error, param_value, error):
"""
Function needed by both ridge and lasso cross validation functions:
accumulate measured errors from each validation for each lambda value.
"""
if accum_count == 1:
plpy.execute("""
CREATE TEMP TABLE {tbl_accum_error} (
lambda double precision,
mean_squared_error double precision)
""".format(tbl_accum_error=tbl_accum_error))
plpy.execute("insert into {tbl_accum_error} values ({param_value}, {error})".
format(tbl_accum_error=tbl_accum_error,
param_value=param_value, error=error))
# ========================================================================
def __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new):
"""
Given an array of strings, pick out the column names and form a single
string, so that we could select only the necessary data when copying is
inevitable.
@param tbl Name of the table that contains the columns
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
variable, so that we can refer to the possible array expression
"""
return col_dep_var + " as " + col_dep_var_cv_new + ", "
+ col_ind_var + " as " + col_ind_var_cv_new
# ========================================================================
def __utils_cv_copy_data_with_id(rel_origin, col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new, rel_copied,
random_id):
"""
If the user does not provide a ID column, the data table has to be
copied and at the same time create a random ID column with it.
@param rel_origin Original table name, a string
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
variable, so that we can refer to the possible array expression
@param rel_copied Name of the table that copies the data from the original table
@param random_id Column name for random unqiue ID in the newly created copied table
"""
# We want to select only the columns that will be used in the computation.
col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new)
__cv_copy_data_with_id_compute(rel_origin, col_string, rel_copied, random_id)
return None
# ========================================================================
def __utils_cv_split_data_using_id_col(rel_source, col_dep_var, col_dep_var_cv_new,
col_ind_var,
col_ind_var_cv_new, col_id, row_num,
rel_train, rel_valid, fold_num, which_fold):
"""
A random ID column exists (originally exists or was created during copying),
split the data into training and validation.
@param rel_source Name of data source table
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
@param col_id Name of the unique ID column
@param row_num Total number of rows
@param rel_train Name of training data table
@param rel_valid Name of validation data table
@param fold_num How many fold cross validation
@param which_fold Which fold will be used as validation part?
"""
col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, col_ind_var, col_ind_var_cv_new)
__cv_split_data_using_id_col_compute(rel_source, col_string, col_id, row_num,
rel_train, rel_valid, fold_num, which_fold)
return None
# ========================================================================
def __utils_cv_split_data_using_id_tbl(rel_origin, col_dep_var, col_dep_var_cv_new,
col_ind_var,
col_ind_var_cv_new, rel_random_id,
random_id, origin_id, row_num, rel_train,
rel_valid, fold_num, which_fold):
"""
Split the data table using a random ID mapping table
A unique ID column exists in the original table, but it is not randomly assigned.
Thus a table that maps this non-random ID to a real random ID has been created by
__cv_generate_random_id.
@param rel_origin Name of the original data table
@param col_dep_var Name of dependent variable
@param col_ind_var Name of independent variable, might be an
array expression
@param col_ind_var_cv_new A single string to rename the independent
@param rel_random_id Name of the random ID mapping table
@param random_id Name of random ID column in the table rel_random_id
@param origin_id Name of the original non-random ID column in rel_origin and rel_random_id
@param row_num Total number of rows
@param rel_train Name of training data table
@param rel_valid Name of validation data table
@param fold_num How many fold cross validation
@param which_fold Which fold will be used as validation part?
"""
col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new,
col_ind_var, col_ind_var_cv_new)
__cv_split_data_using_id_tbl_compute(rel_origin, col_string, rel_random_id,
random_id, origin_id, row_num, rel_train,
rel_valid, fold_num, which_fold)
return None