| |
| import plpy |
| from utilities.utilities import unique_string, _cast_if_null |
| from validation.cv_utils import __cv_copy_data_with_id_compute |
| from validation.cv_utils import __cv_split_data_using_id_col_compute |
| from validation.cv_utils import __cv_split_data_using_id_tbl_compute |
| from validation.cv_utils import __cv_generate_random_id |
| from utilities.utilities import __mad_version |
| from utilities.utilities import _check_groups |
| from utilities.utilities import get_table_qualified_col_str |
| from utilities.utilities import split_quoted_delimited_str |
| |
| version_wrapper = __mad_version() |
| mad_vec = version_wrapper.select_vecfunc() |
| |
| # ======================================================================== |
| |
| def utils_ind_var_scales(tbl_data, col_ind_var, dimension, schema_madlib, |
| x_mean_table=None, set_zero_std_to_one=False): |
| """ |
| The mean and standard deviation for each dimension of an array stored |
| in a column. |
| @param: |
| tbl_data, |
| col_ind_var, |
| dimension, |
| schema_madlib, |
| x_mean_table, (optional, default is None. If set to a valid string |
| the output will be dumped to a temp table) |
| set_zero_std_to_one (optional, default is False. If set to true |
| 0.0 standard deviation values will be set to 1.0) |
| |
| Returns: |
| Dictionary with keys 'mean' and 'std' each with a value of an array of |
| the mean and std. dev values respectively. |
| |
| """ |
| create_table = "" |
| if x_mean_table: |
| create_table = "CREATE TEMP TABLE {0} AS ".format(x_mean_table) |
| # If we want to set features that have standard deviation value of 0 |
| # to 1, we must call a different UDA. We must set standard deviation |
| # of 0 to 1 for algorithms such as neural networks. But for algorithms |
| # such as elastic_net, we don't have to do that since the corresponding |
| # coefficient itself is set to 0 (thus ignoring that particular feature |
| # altogether). |
| if set_zero_std_to_one: |
| scaling_uda_name = 'utils_var_scales_non_zero_std' |
| else: |
| scaling_uda_name = 'utils_var_scales' |
| query = """ |
| {create_table} |
| SELECT (f).* |
| FROM ( |
| SELECT |
| {schema_madlib}.__utils_var_scales_result( |
| {schema_madlib}.{scaling_uda_name}({col_ind_var}, {dimension})) as f |
| FROM |
| {tbl_data} |
| ) q2""" |
| x_scales = plpy.execute(query.format(**locals())) |
| if x_mean_table: |
| x_scales = plpy.execute(""" |
| SELECT * FROM {x_mean_table} |
| """.format(**locals())) |
| x_scaled_vals = dict() |
| x_scaled_vals["mean"] = mad_vec(x_scales[0]["mean"], text=False) |
| x_scaled_vals["std"] = mad_vec(x_scales[0]["std"], text=False) |
| return x_scaled_vals |
| # ======================================================================== |
| |
| |
| def utils_ind_var_scales_grouping(tbl_data, col_ind_var, dimension, |
| schema_madlib, grouping_col, x_mean_table, |
| set_zero_std_to_one=False, |
| create_temp_table=True): |
| """ |
| The mean and standard deviation for each dimension of an array stored in |
| a column. Creates a table containing the mean (array) and std of each |
| dimension of the independent variable, for each group. |
| @param: |
| tbl_data, |
| col_ind_var, |
| dimension, |
| schema_madlib, |
| grouping_col, |
| x_mean_table, |
| set_zero_std_to_one: (optional, default is False. If set to true |
| 0.0 standard deviation values will be set to 1.0) |
| create_temp_table: If set to false, create a persistent instead of a |
| temp table, else create a temp table for x_mean |
| |
| Returns: |
| Dictionary with keys 'mean' and 'std' each with a value of an array of |
| the mean and std. dev values respectively, for each group. |
| """ |
| |
| # If we want to set features that have standard deviation value of 0 |
| # to 1, we must call a different UDA. We must set standard deviation |
| # of 0 to 1 for algorithms such as neural networks. But for algorithms |
| # such as elastic_net, we don't have to do that since the corresponding |
| # coefficient itself is set to 0 (thus ignoring that particular feature |
| # altogether). |
| if set_zero_std_to_one: |
| scaling_uda_name = 'utils_var_scales_non_zero_std' |
| else: |
| scaling_uda_name = 'utils_var_scales' |
| group_col = _cast_if_null(grouping_col, unique_string('grp_col')) |
| create_table_command = "CREATE TEMP TABLE" if create_temp_table else \ |
| "CREATE TABLE" |
| x_scales = plpy.execute( |
| """ |
| {create_table_command} {x_mean_table} AS |
| SELECT {group_col}, (f).* |
| FROM ( |
| SELECT {group_col}, |
| {schema_madlib}.__utils_var_scales_result( |
| {schema_madlib}.{scaling_uda_name}({col_ind_var}, {dimension})) as f |
| FROM |
| {tbl_data} |
| GROUP BY {group_col} |
| ) q2 |
| """.format(**locals())) |
| # ======================================================================== |
| |
| |
| def __utils_dep_var_scale(schema_madlib, tbl_data, col_ind_var, |
| col_dep_var): |
| """ |
| The mean and standard deviation for each element of the dependent variable, |
| which is a scalar in ridge and lasso. |
| |
| This function is also used in lasso. |
| |
| Parameters: |
| schema_madlib -- madlib schema |
| tbl_data -- original data |
| col_ind_var -- independent variables column |
| col_dep_var -- dependent variable column |
| """ |
| y_scale = plpy.execute( |
| """ |
| SELECT |
| avg(CASE WHEN NOT {schema_madlib}.array_contains_null({col_ind_var}) |
| THEN {col_dep_var} END) AS mean, |
| 1 AS std |
| FROM {tbl_data} |
| """.format(**locals()))[0] |
| return y_scale |
| # ======================================================================== |
| |
| |
| def __utils_dep_var_scale_grouping(y_mean_table, tbl_data, grouping_col, |
| family, schema_madlib=None, |
| col_ind_var=None, col_dep_var=None): |
| """ |
| The mean and standard deviation for each element of the dependent variable, |
| w.r.t a group, which is a scalar in ridge and lasso. |
| |
| The output will be stored in a temp table: a mean array and a std array, |
| for each group. |
| If the family is Binomial, mean and std for each group is set to 0 and 1 |
| respectively. |
| |
| This function is also used in lasso. |
| |
| Parameters: |
| y_mean_table -- name of the output table to write into |
| tbl_data -- input table |
| grouping_col -- the columns to group the data on |
| family -- if family is Gaussian, ALL following parameters must be defined |
| schema_madlib -- madlib schema |
| col_ind_var -- independent variables column |
| col_dep_var -- dependent variable column |
| """ |
| group_col = _cast_if_null(grouping_col, unique_string('grp_col')) |
| if family == 'binomial': |
| mean_str = '0' |
| else: |
| # If the family is Gaussian, schema_madlib, col_ind_var and |
| # col_dep_var must be passed along. |
| if schema_madlib is None or col_ind_var is None or col_dep_var is None: |
| plpy.error("Schema name, indpendent column and dependent column names required.") |
| mean_str = (' avg(CASE WHEN NOT {0}.array_contains_null({1}) THEN {2} END) '. |
| format(schema_madlib, col_ind_var, col_dep_var)) |
| plpy.execute( |
| """ |
| CREATE TEMP TABLE {y_mean_table} AS |
| SELECT {group_col}, |
| {mean_str} AS mean, |
| 1 AS std |
| FROM {tbl_data} |
| GROUP BY {group_col} |
| """.format(**locals())) |
| # ======================================================================== |
| |
| |
| def __utils_normalize_data_grouping(y_decenter=True, **kwargs): |
| """ |
| Normalize the independent and dependent variables using the calculated |
| mean's and std's in utils_ind_var_scales and __utils_dep_var_scale. |
| |
| Compute the scaled variables by: scaled_value = (origin_value - mean) / std, |
| and special care is needed if std is zero. |
| |
| The output is a table with scaled independent and dependent variables, |
| based on mean and std for each group. This function is also used in lasso. |
| |
| Parameters: |
| tbl_data -- original data |
| col_ind_var -- independent variables column |
| dimension -- length of independent variable array |
| col_dep_var -- dependent variable column |
| tbl_ind_scales -- independent variables scales array |
| tbl_dep_scale -- dependent variable scale |
| tbl_data_scaled -- scaled data result |
| col_ind_var_norm_new -- create a new name for the scaled array |
| to be compatible with array[...] expressions |
| x_mean_table -- name of the table containing mean of 'x' for each group |
| y_mean_table -- name of the table containing mean of 'y' for each group |
| grouping_col -- columns to group the data on |
| """ |
| group_col = kwargs.get('grouping_col') |
| group_col_list = split_quoted_delimited_str(group_col) |
| # If more than one column was specified for grouping, prefix each column |
| # name with '{tbl_data}.' to avoid ambiguity. |
| select_grouping_cols = get_table_qualified_col_str(kwargs.get('tbl_data'), |
| group_col_list) |
| x_mean_join_clause = '' |
| y_mean_join_clause = '' |
| if kwargs.get('x_mean_table'): |
| group_where_x = _check_groups(kwargs.get('tbl_data'), '__x__', |
| group_col_list) |
| x_mean_join_clause = "INNER JOIN {0} AS __x__ ON {1}".format( |
| kwargs.get('x_mean_table'), group_where_x) |
| if kwargs.get('y_mean_table'): |
| group_where_y = _check_groups(kwargs.get('tbl_data'), '__y__', |
| group_col_list) |
| y_mean_join_clause = "INNER JOIN {0} AS __y__ ON {1}".format( |
| kwargs.get('y_mean_table'), group_where_y) |
| ydecenter_str = "- __y__.mean".format(**kwargs) if y_decenter else "" |
| weights_str = ", {weights}".format(**kwargs) if 'weights' in kwargs else "" |
| plpy.execute(""" |
| CREATE TEMP TABLE {tbl_data_scaled} |
| m4_ifdef(`__POSTGRESQL__', `', `WITH (appendonly=true)') |
| AS |
| SELECT |
| ({schema_madlib}.utils_normalize_data({col_ind_var}, |
| __x__.mean::double precision[], |
| __x__.std::double precision[])) |
| AS {col_ind_var_norm_new}, |
| ({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new}, |
| {select_grouping_cols} |
| {weights_str} |
| FROM {tbl_data} |
| {x_mean_join_clause} |
| {y_mean_join_clause} |
| """.format(ydecenter_str=ydecenter_str, group_col=group_col, |
| x_mean_join_clause=x_mean_join_clause, |
| y_mean_join_clause=y_mean_join_clause, |
| select_grouping_cols=select_grouping_cols, |
| weights_str=weights_str, |
| **kwargs)) |
| return None |
| # ======================================================================== |
| |
| |
| def __utils_normalize_data(y_decenter=True, **kwargs): |
| """ |
| Normalize the independent and dependent variables using the calculated mean's and std's |
| in utils_ind_var_scales and __utils_dep_var_scale. |
| |
| Compute the scaled variables by: scaled_value = (origin_value - mean) / std, and special |
| care is needed if std is zero. |
| |
| The output is a table with scaled independent and dependent variables. |
| |
| This function is also used in lasso. |
| |
| Parameters: |
| tbl_data -- original data |
| col_ind_var -- independent variables column |
| dimension -- length of independent variable array |
| col_dep_var -- dependent variable column |
| tbl_ind_scales -- independent variables scales array |
| tbl_dep_scale -- dependent variable scale |
| tbl_data_scaled -- scaled data result |
| col_ind_var_norm_new -- create a new name for the scaled array |
| to be compatible with array[...] expressions |
| """ |
| ydecenter_str = "- {y_mean}".format(**kwargs) if y_decenter else "" |
| weights_str = ", {weights}".format(**kwargs) if 'weights' in kwargs else "" |
| plpy.execute( |
| """ |
| CREATE TEMP TABLE {tbl_data_scaled} |
| m4_ifdef(`__POSTGRESQL__', `', `WITH (appendonly=true)') |
| AS |
| SELECT |
| ({schema_madlib}.utils_normalize_data( |
| {col_ind_var}, |
| '{x_mean_str}'::double precision[], |
| '{x_std_str}'::double precision[])) |
| AS {col_ind_var_norm_new}, |
| ({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new} |
| {weights_str} |
| FROM {tbl_data} |
| """.format(ydecenter_str=ydecenter_str, |
| weights_str=weights_str, |
| **kwargs)) |
| |
| return None |
| # ======================================================================== |
| |
| |
| def __utils_cv_preprocess(kwargs): |
| """ |
| Some common processes used in both ridge and lasso cross validation functions: |
| |
| copy data if needed, |
| update name space, |
| check the validity of fold_num, |
| create table to store all the fitting coefficients |
| """ |
| # table containing the data, which will be |
| # split into training and validation parts |
| data_tbl = kwargs["data_tbl"] |
| # whether the user provides a unique ID for each row |
| # if not, then it is None |
| data_id = kwargs["data_id"] |
| # whether the ID provided by user is random |
| id_is_random = kwargs["id_is_random"] |
| col_ind_var = kwargs["col_ind_var"] |
| col_dep_var = kwargs["col_dep_var"] |
| # new name for dependent column due to data copy & split |
| col_dep_var_cv_new = unique_string() |
| # new name for dependent column due to normalization |
| col_dep_var_norm_new = unique_string() |
| # how many fold validation, default: 10 |
| fold_num = kwargs["fold_num"] |
| # how many fold actually will be used, default: 10. |
| # If 1, it is just one validation. |
| upto_fold = kwargs["upto_fold"] |
| # if need to copy the data, |
| # this is the copied table name |
| tbl_all_data = unique_string() |
| # table name before normalization |
| tbl_inter = unique_string() |
| # table name for training |
| tbl_train = unique_string() |
| # table name for validation |
| tbl_valid = unique_string() |
| # column name for random id |
| col_random_id = unique_string() |
| # table for random ID mapping, used when |
| # data_id is not None and id_is_random is False |
| tbl_random_id = unique_string() |
| # accumulate the error information |
| tbl_accum_error = unique_string() |
| # independent variable (array) scales |
| # inclduing mean's and std's |
| tbl_ind_scales = unique_string() |
| # dependent variable scale including mean and std |
| tbl_dep_scale = unique_string() |
| # table to store fitting coefficients from |
| # all validations for all parameter values |
| tbl_coef = unique_string() |
| # new name for independent column due to data copy & split |
| col_ind_var_cv_new = unique_string() |
| # new name for independent column due to normalization |
| col_ind_var_norm_new = unique_string() |
| kwargs.update(dict(tbl_accum_error=tbl_accum_error, |
| tbl_all_data=tbl_all_data, |
| tbl_inter=tbl_inter, |
| tbl_train=tbl_train, |
| tbl_valid=tbl_valid, |
| tbl_random_id=tbl_random_id, |
| col_random_id=col_random_id, |
| tbl_ind_scales=tbl_ind_scales, |
| tbl_dep_scale=tbl_dep_scale, |
| col_ind_var_cv_new=col_ind_var_cv_new, |
| col_ind_var_norm_new=col_ind_var_norm_new, |
| col_dep_var_cv_new=col_dep_var_cv_new, |
| col_dep_var_norm_new=col_dep_var_norm_new, |
| tbl_coef=tbl_coef)) |
| |
| # data_cols = [col_ind_var, col_dep_var] |
| if data_id is None: |
| # unique ID column is not given, has to copy the data and create the ID |
| # CV function cannot be used here, because of possible array expressions |
| __utils_cv_copy_data_with_id(data_tbl, col_dep_var, col_ind_var, |
| col_dep_var_cv_new, col_ind_var_cv_new, tbl_all_data, col_random_id) |
| tbl_used = tbl_all_data |
| kwargs["col_ind_var_cp_new"] = col_ind_var_cv_new |
| kwargs["col_dep_var_cp_new"] = col_dep_var_cv_new |
| elif id_is_random: |
| # unique ID column is given and is random |
| tbl_used = data_tbl |
| col_random_id = data_id |
| kwargs["col_random_id"] = data_id |
| kwargs["col_ind_var_cp_new"] = col_ind_var |
| kwargs["col_dep_var_cp_new"] = col_dep_var |
| else: |
| # the provided unique ID is not random, create |
| # a table mapping the given ID to a random ID |
| __cv_generate_random_id(data_tbl, data_id, tbl_random_id, col_random_id) |
| tbl_used = data_tbl |
| kwargs["col_ind_var_cp_new"] = col_ind_var |
| kwargs["col_dep_var_cp_new"] = col_dep_var |
| |
| # original data row number |
| row_num = plpy.execute("select count(*) as row_num from {data_tbl}".format(**kwargs))[0]["row_num"] |
| # length of the independent variable array |
| dimension = plpy.execute("select max(array_upper({col_ind_var},1)) as dimension from {data_tbl}".format(**kwargs))[0]["dimension"] |
| |
| kwargs.update(dict(tbl_use=tbl_used, row_num=row_num, dimension=dimension)) |
| |
| # table to append all fitting results |
| # which are distinguished by id |
| plpy.execute(""" |
| drop table if exists {tbl_coef}; |
| create temp table {tbl_coef} (id integer, coef double precision[], intercept double precision) |
| """.format(**kwargs)) |
| return None |
| # ======================================================================== |
| |
| |
| def __utils_accumulate_error(accum_count, tbl_accum_error, param_value, error): |
| """ |
| Function needed by both ridge and lasso cross validation functions: |
| |
| accumulate measured errors from each validation for each lambda value. |
| """ |
| if accum_count == 1: |
| plpy.execute(""" |
| CREATE TEMP TABLE {tbl_accum_error} ( |
| lambda double precision, |
| mean_squared_error double precision) |
| """.format(tbl_accum_error=tbl_accum_error)) |
| plpy.execute("insert into {tbl_accum_error} values ({param_value}, {error})". |
| format(tbl_accum_error=tbl_accum_error, |
| param_value=param_value, error=error)) |
| # ======================================================================== |
| |
| |
| def __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new): |
| """ |
| Given an array of strings, pick out the column names and form a single |
| string, so that we could select only the necessary data when copying is |
| inevitable. |
| |
| @param tbl Name of the table that contains the columns |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| variable, so that we can refer to the possible array expression |
| """ |
| return col_dep_var + " as " + col_dep_var_cv_new + ", " |
| + col_ind_var + " as " + col_ind_var_cv_new |
| # ======================================================================== |
| |
| |
| def __utils_cv_copy_data_with_id(rel_origin, col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new, rel_copied, |
| random_id): |
| """ |
| If the user does not provide a ID column, the data table has to be |
| copied and at the same time create a random ID column with it. |
| |
| @param rel_origin Original table name, a string |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| variable, so that we can refer to the possible array expression |
| @param rel_copied Name of the table that copies the data from the original table |
| @param random_id Column name for random unqiue ID in the newly created copied table |
| """ |
| # We want to select only the columns that will be used in the computation. |
| col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new) |
| __cv_copy_data_with_id_compute(rel_origin, col_string, rel_copied, random_id) |
| return None |
| # ======================================================================== |
| |
| |
| def __utils_cv_split_data_using_id_col(rel_source, col_dep_var, col_dep_var_cv_new, |
| col_ind_var, |
| col_ind_var_cv_new, col_id, row_num, |
| rel_train, rel_valid, fold_num, which_fold): |
| """ |
| A random ID column exists (originally exists or was created during copying), |
| split the data into training and validation. |
| |
| @param rel_source Name of data source table |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| @param col_id Name of the unique ID column |
| @param row_num Total number of rows |
| @param rel_train Name of training data table |
| @param rel_valid Name of validation data table |
| @param fold_num How many fold cross validation |
| @param which_fold Which fold will be used as validation part? |
| """ |
| col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, col_ind_var, col_ind_var_cv_new) |
| __cv_split_data_using_id_col_compute(rel_source, col_string, col_id, row_num, |
| rel_train, rel_valid, fold_num, which_fold) |
| return None |
| # ======================================================================== |
| |
| |
| def __utils_cv_split_data_using_id_tbl(rel_origin, col_dep_var, col_dep_var_cv_new, |
| col_ind_var, |
| col_ind_var_cv_new, rel_random_id, |
| random_id, origin_id, row_num, rel_train, |
| rel_valid, fold_num, which_fold): |
| """ |
| Split the data table using a random ID mapping table |
| |
| A unique ID column exists in the original table, but it is not randomly assigned. |
| Thus a table that maps this non-random ID to a real random ID has been created by |
| __cv_generate_random_id. |
| |
| @param rel_origin Name of the original data table |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| @param rel_random_id Name of the random ID mapping table |
| @param random_id Name of random ID column in the table rel_random_id |
| @param origin_id Name of the original non-random ID column in rel_origin and rel_random_id |
| @param row_num Total number of rows |
| @param rel_train Name of training data table |
| @param rel_valid Name of validation data table |
| @param fold_num How many fold cross validation |
| @param which_fold Which fold will be used as validation part? |
| """ |
| col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new) |
| __cv_split_data_using_id_tbl_compute(rel_origin, col_string, rel_random_id, |
| random_id, origin_id, row_num, rel_train, |
| rel_valid, fold_num, which_fold) |
| return None |