| |
| import plpy |
| from utilities.utilities import unique_string, _cast_if_null |
| from validation.cv_utils import __cv_copy_data_with_id_compute |
| from validation.cv_utils import __cv_split_data_using_id_col_compute |
| from validation.cv_utils import __cv_split_data_using_id_tbl_compute |
| from validation.cv_utils import __cv_generate_random_id |
| from utilities.utilities import __mad_version |
| from utilities.utilities import split_quoted_delimited_str |
| |
| version_wrapper = __mad_version() |
| mad_vec = version_wrapper.select_vecfunc() |
| |
| # ======================================================================== |
| |
| def __utils_ind_var_scales(tbl_data, col_ind_var, dimension, schema_madlib): |
| """ |
| The mean and standard deviation for each dimension of an array stored |
| in a column. |
| |
| Returns: |
| Dictionary with keys 'mean' and 'std' each with a value of an array of |
| the mean and std. dev values respectively. |
| |
| """ |
| x_scales = plpy.execute( |
| """ |
| SELECT (f).* |
| FROM ( |
| SELECT |
| {schema_madlib}.__utils_var_scales_result( |
| {schema_madlib}.utils_var_scales({col_ind_var}, {dimension})) as f |
| FROM |
| {tbl_data} |
| ) q2 |
| """.format(**locals()))[0] |
| x_scales["mean"] = mad_vec(x_scales["mean"], text=False) |
| x_scales["std"] = mad_vec(x_scales["std"], text=False) |
| return x_scales |
| # ======================================================================== |
| |
| def __utils_ind_var_scales_grouping(tbl_data, col_ind_var, dimension, |
| schema_madlib, grouping_col, x_mean_table): |
| """ |
| The mean and standard deviation for each dimension of an array stored in |
| a column. Creates a table containing the mean (array) and std of each |
| dimension of the independent variable, for each group. |
| """ |
| group_col = _cast_if_null(grouping_col, unique_string('grp_col')) |
| x_scales = plpy.execute( |
| """ |
| CREATE TEMP TABLE {x_mean_table} AS |
| SELECT (f).*, {group_col} |
| FROM ( |
| SELECT {group_col}, |
| {schema_madlib}.__utils_var_scales_result( |
| {schema_madlib}.utils_var_scales({col_ind_var}, {dimension})) as f |
| FROM |
| {tbl_data} |
| GROUP BY {group_col} |
| ) q2 |
| """.format(**locals())) |
| # ======================================================================== |
| |
| def __utils_dep_var_scale(schema_madlib, tbl_data, col_ind_var, |
| col_dep_var): |
| """ |
| The mean and standard deviation for each element of the dependent variable, |
| which is a scalar in ridge and lasso. |
| |
| This function is also used in lasso. |
| |
| Parameters: |
| schema_madlib -- madlib schema |
| tbl_data -- original data |
| col_ind_var -- independent variables column |
| col_dep_var -- dependent variable column |
| """ |
| y_scale = plpy.execute( |
| """ |
| SELECT |
| avg(CASE WHEN NOT {schema_madlib}.array_contains_null({col_ind_var}) THEN {col_dep_var} END) AS mean, |
| 1 AS std |
| FROM {tbl_data} |
| """.format(**locals()))[0] |
| return y_scale |
| # ======================================================================== |
| |
| def __utils_dep_var_scale_grouping(y_mean_table, tbl_data, grouping_col, |
| family, schema_madlib=None, col_ind_var=None, col_dep_var=None): |
| """ |
| The mean and standard deviation for each element of the dependent variable, |
| w.r.t a group, which is a scalar in ridge and lasso. |
| |
| The output will be stored in a temp table: a mean array and a std array, |
| for each group. |
| If the family is Binomial, mean and std for each group is set to 0 and 1 |
| respectively. |
| |
| This function is also used in lasso. |
| |
| Parameters: |
| y_mean_table -- name of the output table to write into |
| tbl_data -- input table |
| grouping_col -- the columns to group the data on |
| family -- if family is Gaussian, ALL following parameters must be defined |
| schema_madlib -- madlib schema |
| col_ind_var -- independent variables column |
| col_dep_var -- dependent variable column |
| """ |
| group_col = _cast_if_null(grouping_col, unique_string('grp_col')) |
| if family == 'binomial': |
| mean_str = '0' |
| else: |
| # If the family is Gaussian, schema_madlib, col_ind_var and |
| # col_dep_var must be passed along. |
| if schema_madlib is None or col_ind_var is None or col_dep_var is None: |
| plpy.error("Schema name, indpendent column and dependent column names required.") |
| mean_str = ' avg(CASE WHEN NOT {0}.array_contains_null({1}) THEN {2} END) '.format( |
| schema_madlib, col_ind_var, col_dep_var) |
| plpy.execute( |
| """ |
| CREATE TEMP TABLE {y_mean_table} AS |
| SELECT {group_col}, |
| {mean_str} AS mean, |
| 1 AS std |
| FROM {tbl_data} |
| GROUP BY {group_col} |
| """.format(**locals())) |
| # ======================================================================== |
| |
| def __utils_normalize_data_grouping(y_decenter=True, **kwargs): |
| """ |
| Normalize the independent and dependent variables using the calculated |
| mean's and std's in __utils_ind_var_scales and __utils_dep_var_scale. |
| |
| Compute the scaled variables by: scaled_value = (origin_value - mean) / std, |
| and special care is needed if std is zero. |
| |
| The output is a table with scaled independent and dependent variables, |
| based on mean and std for each group. This function is also used in lasso. |
| |
| Parameters: |
| tbl_data -- original data |
| col_ind_var -- independent variables column |
| dimension -- length of independent variable array |
| col_dep_var -- dependent variable column |
| tbl_ind_scales -- independent variables scales array |
| tbl_dep_scale -- dependent variable scale |
| tbl_data_scaled -- scaled data result |
| col_ind_var_norm_new -- create a new name for the scaled array |
| to be compatible with array[...] expressions |
| x_mean_table -- name of the table containing mean of 'x' for each group |
| y_mean_table -- name of the table containing mean of 'y' for each group |
| grouping_col -- columns to group the data on |
| """ |
| group_col = kwargs.get('grouping_col') |
| group_col_list = split_quoted_delimited_str(group_col) |
| group_where_x = ' AND '.join(['{tbl_data}.{grp}=__x__.{grp}'.format(grp=grp, |
| **kwargs) for grp in group_col_list]) |
| group_where_y = ' AND '.join(['{tbl_data}.{grp}=__y__.{grp}'.format(grp=grp, |
| **kwargs) for grp in group_col_list]) |
| ydecenter_str = "- __y__.mean".format(**kwargs) if y_decenter else "" |
| plpy.execute( |
| """ |
| CREATE TEMP TABLE {tbl_data_scaled} AS |
| SELECT |
| ({schema_madlib}.utils_normalize_data({col_ind_var}, |
| __x__.mean::double precision[], |
| __x__.std::double precision[])) |
| AS {col_ind_var_norm_new}, |
| ({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new}, |
| {tbl_data}.{group_col} |
| FROM {tbl_data} |
| INNER JOIN {x_mean_table} AS __x__ ON {group_where_x} |
| INNER JOIN {y_mean_table} AS __y__ ON {group_where_y} |
| """.format(ydecenter_str=ydecenter_str, group_col=group_col, |
| group_where_x=group_where_x, group_where_y=group_where_y, **kwargs)) |
| return None |
| # ======================================================================== |
| |
| def __utils_normalize_data(y_decenter=True, **kwargs): |
| """ |
| Normalize the independent and dependent variables using the calculated mean's and std's |
| in __utils_ind_var_scales and __utils_dep_var_scale. |
| |
| Compute the scaled variables by: scaled_value = (origin_value - mean) / std, and special |
| care is needed if std is zero. |
| |
| The output is a table with scaled independent and dependent variables. |
| |
| This function is also used in lasso. |
| |
| Parameters: |
| tbl_data -- original data |
| col_ind_var -- independent variables column |
| dimension -- length of independent variable array |
| col_dep_var -- dependent variable column |
| tbl_ind_scales -- independent variables scales array |
| tbl_dep_scale -- dependent variable scale |
| tbl_data_scaled -- scaled data result |
| col_ind_var_norm_new -- create a new name for the scaled array |
| to be compatible with array[...] expressions |
| """ |
| ydecenter_str = "- {y_mean}".format(**kwargs) if y_decenter else "" |
| plpy.execute( |
| """ |
| CREATE TEMP TABLE {tbl_data_scaled} AS |
| SELECT |
| ({schema_madlib}.utils_normalize_data({col_ind_var}, |
| '{x_mean_str}'::double precision[], |
| '{x_std_str}'::double precision[])) |
| AS {col_ind_var_norm_new}, |
| ({col_dep_var} {ydecenter_str}) AS {col_dep_var_norm_new} |
| FROM {tbl_data} |
| """.format(ydecenter_str=ydecenter_str, **kwargs)) |
| |
| return None |
| # ======================================================================== |
| |
| def __utils_cv_preprocess(kwargs): |
| """ |
| Some common processes used in both ridge and lasso cross validation functions: |
| |
| copy data if needed, |
| update name space, |
| check the validity of fold_num, |
| create table to store all the fitting coefficients |
| """ |
| # table containing the data, which will be |
| # split into training and validation parts |
| data_tbl = kwargs["data_tbl"] |
| # whether the user provides a unique ID for each row |
| # if not, then it is None |
| data_id = kwargs["data_id"] |
| # whether the ID provided by user is random |
| id_is_random = kwargs["id_is_random"] |
| col_ind_var = kwargs["col_ind_var"] |
| col_dep_var = kwargs["col_dep_var"] |
| # new name for dependent column due to data copy & split |
| col_dep_var_cv_new = unique_string() |
| # new name for dependent column due to normalization |
| col_dep_var_norm_new = unique_string() |
| # how many fold validation, default: 10 |
| fold_num = kwargs["fold_num"] |
| # how many fold actually will be used, default: 10. |
| # If 1, it is just one validation. |
| upto_fold = kwargs["upto_fold"] |
| # if need to copy the data, |
| # this is the copied table name |
| tbl_all_data = unique_string() |
| # table name before normalization |
| tbl_inter = unique_string() |
| # table name for training |
| tbl_train = unique_string() |
| # table name for validation |
| tbl_valid = unique_string() |
| # column name for random id |
| col_random_id = unique_string() |
| # table for random ID mapping, used when |
| # data_id is not None and id_is_random is False |
| tbl_random_id = unique_string() |
| # accumulate the error information |
| tbl_accum_error = unique_string() |
| # independent variable (array) scales |
| # inclduing mean's and std's |
| tbl_ind_scales = unique_string() |
| # dependent variable scale including mean and std |
| tbl_dep_scale = unique_string() |
| # table to store fitting coefficients from |
| # all validations for all parameter values |
| tbl_coef = unique_string() |
| # new name for independent column due to data copy & split |
| col_ind_var_cv_new = unique_string() |
| # new name for independent column due to normalization |
| col_ind_var_norm_new = unique_string() |
| kwargs.update(dict(tbl_accum_error = tbl_accum_error, |
| tbl_all_data = tbl_all_data, |
| tbl_inter = tbl_inter, |
| tbl_train = tbl_train, |
| tbl_valid = tbl_valid, |
| tbl_random_id = tbl_random_id, |
| col_random_id = col_random_id, |
| tbl_ind_scales = tbl_ind_scales, |
| tbl_dep_scale = tbl_dep_scale, |
| col_ind_var_cv_new = col_ind_var_cv_new, |
| col_ind_var_norm_new = col_ind_var_norm_new, |
| col_dep_var_cv_new = col_dep_var_cv_new, |
| col_dep_var_norm_new = col_dep_var_norm_new, |
| tbl_coef = tbl_coef)) |
| |
| # data_cols = [col_ind_var, col_dep_var] |
| if data_id is None: |
| # unique ID column is not given, has to copy the data and create the ID |
| # CV function cannot be used here, because of possible array expressions |
| __utils_cv_copy_data_with_id(data_tbl, col_dep_var, col_ind_var, |
| col_dep_var_cv_new, col_ind_var_cv_new, tbl_all_data, col_random_id) |
| tbl_used = tbl_all_data |
| kwargs["col_ind_var_cp_new"] = col_ind_var_cv_new |
| kwargs["col_dep_var_cp_new"] = col_dep_var_cv_new |
| elif id_is_random: |
| # unique ID column is given and is random |
| tbl_used = data_tbl |
| col_random_id = data_id |
| kwargs["col_random_id"] = data_id |
| kwargs["col_ind_var_cp_new"] = col_ind_var |
| kwargs["col_dep_var_cp_new"] = col_dep_var |
| else: |
| # the provided unique ID is not random, create |
| # a table mapping the given ID to a random ID |
| __cv_generate_random_id(data_tbl, data_id, tbl_random_id, col_random_id) |
| tbl_used = data_tbl |
| kwargs["col_ind_var_cp_new"] = col_ind_var |
| kwargs["col_dep_var_cp_new"] = col_dep_var |
| |
| # original data row number |
| row_num = plpy.execute("select count(*) as row_num from {data_tbl}".format(**kwargs))[0]["row_num"] |
| # length of the independent variable array |
| dimension = plpy.execute("select max(array_upper({col_ind_var},1)) as dimension from {data_tbl}".format(**kwargs))[0]["dimension"] |
| |
| kwargs.update(dict(tbl_used = tbl_used, row_num = row_num, dimension = dimension)) |
| |
| # table to append all fitting results |
| # which are distinguished by id |
| plpy.execute(""" |
| drop table if exists {tbl_coef}; |
| create temp table {tbl_coef} (id integer, coef double precision[], intercept double precision) |
| """.format(**kwargs)) |
| return None |
| |
| ## ======================================================================== |
| |
| def __utils_accumulate_error(accum_count, tbl_accum_error, param_value, error): |
| """ |
| Function needed by both ridge and lasso cross validation functions: |
| |
| accumulate measured errors from each validation for each lambda value. |
| """ |
| if accum_count == 1: |
| plpy.execute("create temp table {tbl_accum_error} (lambda double precision, mean_squared_error double precision)".format(tbl_accum_error = tbl_accum_error)) |
| plpy.execute("insert into {tbl_accum_error} values ({param_value}, {error})".format( |
| tbl_accum_error = tbl_accum_error, |
| param_value = param_value, error = error)) |
| |
| ## ======================================================================== |
| |
| def __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new): |
| """ |
| Given an array of strings, pick out the column names and form a single |
| string, so that we could select only the necessary data when copying is |
| inevitable. |
| |
| @param tbl Name of the table that contains the columns |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| variable, so that we can refer to the possible array expression |
| """ |
| return col_dep_var + " as " + col_dep_var_cv_new + ", " |
| + col_ind_var + " as " + col_ind_var_cv_new |
| |
| # ======================================================================== |
| |
| def __utils_cv_copy_data_with_id (rel_origin, col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new, rel_copied, |
| random_id): |
| """ |
| If the user does not provide a ID column, the data table has to be |
| copied and at the same time create a random ID column with it. |
| |
| @param rel_origin Original table name, a string |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| variable, so that we can refer to the possible array expression |
| @param rel_copied Name of the table that copies the data from the original table |
| @param random_id Column name for random unqiue ID in the newly created copied table |
| """ |
| # We want to select only the columns that will be used in the computation. |
| col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new) |
| __cv_copy_data_with_id_compute (rel_origin, col_string, rel_copied, random_id) |
| return None |
| |
| # ======================================================================== |
| |
| def __utils_cv_split_data_using_id_col (rel_source, col_dep_var, col_dep_var_cv_new, |
| col_ind_var, |
| col_ind_var_cv_new, col_id, row_num, |
| rel_train, rel_valid, fold_num, which_fold): |
| """ |
| A random ID column exists (originally exists or was created during copying), |
| split the data into training and validation. |
| |
| @param rel_source Name of data source table |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| @param col_id Name of the unique ID column |
| @param row_num Total number of rows |
| @param rel_train Name of training data table |
| @param rel_valid Name of validation data table |
| @param fold_num How many fold cross validation |
| @param which_fold Which fold will be used as validation part? |
| """ |
| col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, col_ind_var, col_ind_var_cv_new) |
| __cv_split_data_using_id_col_compute (rel_source, col_string, col_id, row_num, |
| rel_train, rel_valid, fold_num, which_fold) |
| return None |
| |
| # ======================================================================== |
| |
| def __utils_cv_split_data_using_id_tbl (rel_origin, col_dep_var, col_dep_var_cv_new, |
| col_ind_var, |
| col_ind_var_cv_new, rel_random_id, |
| random_id, origin_id, row_num, rel_train, |
| rel_valid, fold_num, which_fold): |
| """ |
| Split the data table using a random ID mapping table |
| |
| A unique ID column exists in the original table, but it is not randomly assigned. |
| Thus a table that maps this non-random ID to a real random ID has been created by |
| __cv_generate_random_id. |
| |
| @param rel_origin Name of the original data table |
| @param col_dep_var Name of dependent variable |
| @param col_ind_var Name of independent variable, might be an |
| array expression |
| @param col_ind_var_cv_new A single string to rename the independent |
| @param rel_random_id Name of the random ID mapping table |
| @param random_id Name of random ID column in the table rel_random_id |
| @param origin_id Name of the original non-random ID column in rel_origin and rel_random_id |
| @param row_num Total number of rows |
| @param rel_train Name of training data table |
| @param rel_valid Name of validation data table |
| @param fold_num How many fold cross validation |
| @param which_fold Which fold will be used as validation part? |
| """ |
| col_string = __utils_cv_produce_col_name_string(col_dep_var, col_dep_var_cv_new, |
| col_ind_var, col_ind_var_cv_new) |
| __cv_split_data_using_id_tbl_compute (rel_origin, col_string, rel_random_id, |
| random_id, origin_id, row_num, rel_train, |
| rel_valid, fold_num, which_fold) |
| return None |