| |
| from utilities.control import MinWarning |
| from utilities.utilities import unique_string |
| from validation.cv_utils import __cv_copy_data_with_id |
| from validation.cv_utils import __cv_split_data_using_id_col |
| from validation.cv_utils import __cv_split_data_using_id_tbl |
| from validation.cv_utils import __cv_summarize_result |
| from validation.cv_utils import __cv_generate_random_id |
| from utilities.utilities import __mad_version |
| from utilities.utilities import preprocess_keyvalue_params |
| from utilities.validate_args import columns_exist_in_table |
| from utilities.validate_args import table_exists |
| from utilities.validate_args import get_cols |
| import plpy |
| |
| version_wrapper = __mad_version() |
| mad_vec = version_wrapper.select_vecfunc() |
| |
| |
| def __cv_combine_params_type_general(params, params_type, tbl_data, |
| col_random_id, param_explored, |
| explore_value, tbl_input, tbl_output, |
| grp_to_param_tbl=None): |
| """ |
| Create argument list for SQL functions for training, validation and metric measuring |
| |
| Args: |
| @param params A string list of parameter names |
| @param params_type A string list of parameter types |
| @param tbl_data Data table name (training, validation) |
| @param col_random_id The random ID column, a string |
| @param param_explored The name that CV runs through |
| @param explore_value The current value of param_explored that is under study, already converted to string |
| @param tbl_input The input table name for SQL function |
| @param tbl_output The output table name for SQL function |
| |
| Returns: |
| Output is a string, where all parameters are force-casted into their type. |
| """ |
| if len(params) != len(params_type): |
| plpy.error("CV error: The number of parameters ({0}) " |
| "should be equal to the number of types ({1})!". |
| format(params, params_type)) |
| |
| rst = [] |
| # The special keywords |
| # opts = set(["%data%", "%id%", "%error%", "%model%", "%prediction%", |
| # "%explore%", param_explored]) |
| for i, p in enumerate(params): |
| if not p or p.upper() == 'NULL': |
| rst.append("NULL") |
| else: |
| p = p.strip() |
| p_type = params_type[i].strip('"') |
| if p == "%data%": |
| rst.append("\'" + tbl_data + "\'::" + p_type) |
| elif p == "%id%": |
| rst.append("\'" + col_random_id + "\'::" + p_type) |
| elif p == "%error%": |
| rst.append("\'" + tbl_output + "\'::" + p_type) |
| elif p == "%model%": |
| if tbl_input is None: |
| rst.append("\'" + tbl_output + "\'::" + p_type) |
| else: |
| rst.append("\'" + tbl_input + "\'::" + p_type) |
| elif p == "%prediction%": |
| if "%error%" in set(params): |
| rst.append("\'" + tbl_input + "\'::" + p_type) |
| else: |
| rst.append("\'" + tbl_output + "\'::" + p_type) |
| elif p == "%group_param_tbl%": |
| rst.append("\'" + tbl_output + "\'::" + p_type) |
| else: |
| rst.append("\'" + p + "\'::" + p_type) |
| return ','.join(rst) |
| # ------------------------------------------------------------------------ |
| |
| |
| def _replace_explore(params, param_explored, explore_value): |
| """ |
| Args: |
| @param arg |
| |
| Returns: |
| |
| """ |
| for i, p in enumerate(params): |
| if p == param_explored or p is not None and "%explore%" in p: |
| if "=" in p: |
| # p is a string with key-value pairs, each pair separated by ',' |
| pairs = preprocess_keyvalue_params(p) |
| param_list = [] |
| for each_pair in pairs: |
| key_value = [i.strip() for i in each_pair.split("=")] |
| if key_value[1] == "%explore%": |
| param_list.append(str(key_value[0]) + "=" + |
| str(explore_value)) |
| else: |
| param_list.append(each_pair) |
| params[i] = ','.join(param_list) |
| else: |
| params[i] = str(explore_value) |
| |
| # ---------------------------------------------------------------------- |
| |
| |
| def __cv_funcall_general(func, params, params_type, tbl_data, col_random_id, |
| param_explored, explore_value, tbl_input, |
| tbl_output): |
| """ |
| Call training, validation or metric measuring function |
| |
| @param func The name of the SQL function to be called, a string |
| |
| For all other parameters, see the doc string of __cv_combine_params_type_general |
| """ |
| arg_string = __cv_combine_params_type_general( |
| params, params_type, tbl_data, col_random_id, |
| param_explored, explore_value, tbl_input, tbl_output |
| ) |
| sql = "SELECT {func}({arg_string})".format(func=func, arg_string=arg_string) |
| plpy.execute(sql) |
| # ------------------------------------------------------------------------ |
| |
| |
| def __cv_param_type_explored(params, params_type, param_explored): |
| """ |
| Find the type of exploring parameter |
| |
| @param params A string list of parameter names |
| @param params_type A string list of parameter types |
| @param param_explored The name that CV runs through |
| """ |
| for i in range(len(params)): |
| if params[i] == param_explored: |
| return params_type[i] |
| return None |
| # ------------------------------------------------------------------------ |
| |
| |
| def _validate_cv_args(schema_madlib, modelling_func, modelling_params, |
| modelling_params_type, param_explored, |
| predict_func, predict_params, predict_params_type, |
| metric_func, metric_params, metric_params_type, |
| data_tbl, data_id, id_is_random, |
| validation_result, n_folds, data_cols, **kwargs): |
| |
| if any(arg is None for arg in |
| (modelling_func, modelling_params, modelling_params_type, |
| predict_func, predict_params, predict_params_type, |
| metric_func, metric_params, metric_params_type, data_tbl, |
| validation_result, n_folds)): |
| plpy.error("CV error: You have unsupported Null value(s) in arguments!") |
| |
| if not table_exists(data_tbl): |
| plpy.error("CV error: No data table!") |
| |
| if table_exists(validation_result, only_first_schema=True): |
| plpy.error("CV error: Output table already exists!") |
| |
| if not columns_exist_in_table(data_tbl, data_cols, schema_madlib): |
| plpy.error("CV error: Data columns do not exist!") |
| |
| if data_id is not None and id_is_random is None: |
| plpy.error("CV error: Is the data row ID random?") |
| |
| if n_folds <= 1: |
| plpy.error("CV error: Cross validation total fold number should " |
| "be larger than 1!") |
| |
| n_rows = plpy.execute("SELECT count(*) AS n_rows FROM " + data_tbl |
| )[0]["n_rows"] |
| if n_rows <= n_folds: |
| plpy.error("CV error: Number of rows less than number of folds") |
| |
| return n_rows |
| # ------------------------------------------------------------------------ |
| |
| |
| def _create_data_tbl_id(data_tbl, data_cols, tbl_all_data, data_id, |
| id_is_random, col_random_id, tbl_random_id, **kwargs): |
| """ Ensure that the data table has a random id column by one of the following: |
| 1. creating a copy of the data table with a random id added in |
| 2. creating a mapping from existing id to a random id |
| 3. use original table if it already has a random id |
| """ |
| if data_id is None: |
| # unique ID column is not given, has to copy the data and create the ID |
| __cv_copy_data_with_id(data_tbl, data_cols, tbl_all_data, col_random_id) |
| tbl_used = tbl_all_data |
| elif id_is_random: |
| # unique ID column is given and is random |
| tbl_used = data_tbl |
| col_random_id = data_id |
| else: |
| # the provided unique ID is not random, create a table |
| # mapping the given ID to a random ID |
| __cv_generate_random_id(data_tbl, data_id, tbl_random_id, col_random_id) |
| tbl_used = data_tbl |
| return tbl_used, col_random_id |
| # ---------------------------------------------------------------------- |
| |
| |
| def _one_step_cv(tbl_output_model, tbl_output_pred, tbl_output_error, tbl_accum_error, |
| col_random_id, temp_param_explored, explore_value, |
| output_created, explore_type_str, data_id, |
| modelling_func, modelling_params, modelling_params_type, tbl_train, |
| predict_func, predict_params, predict_params_type, tbl_valid, |
| metric_func, metric_params, metric_params_type, k=0, append_k=False, |
| use_existing_tables=False, **kwargs): |
| """ |
| Args: |
| @param schema_madlib |
| |
| Returns: |
| """ |
| # Some modules, such as decision tree, |
| # support appending the results from different cv folds |
| # to one single table to improve the performance, |
| # while most of other modules create its own |
| # output tables in each cv fold. |
| if not use_existing_tables: |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {tbl_output_model}; |
| DROP TABLE IF EXISTS {tbl_output_model}_summary; |
| DROP TABLE IF EXISTS {tbl_output_pred}; |
| DROP TABLE IF EXISTS {tbl_output_error} |
| """.format(**locals())) |
| |
| # In case the functions need cv fold number k |
| # For example, in decision tree cv, we keep append |
| # results to the same tables and use k to distinguish |
| # them. |
| if append_k: |
| # We do not want to change the original values |
| # that are passed into this function, so '+=' |
| # cannot be used here. Otherwise, every call to |
| # this function would append a new item to the |
| # lists. (modelling_params += [str(k)] is wrong). |
| modelling_params = modelling_params + [str(k)] |
| modelling_params_type = modelling_params_type + ['INTEGER'] |
| predict_params = predict_params + [str(k)] |
| predict_params_type = predict_params_type + ['INTEGER'] |
| metric_params = metric_params + [str(k)] |
| metric_params_type = metric_params_type + ['INTEGER'] |
| use_fold = 'WHERE k = ' + str(k) |
| else: |
| use_fold = '' |
| |
| # train |
| __cv_funcall_general( |
| modelling_func, modelling_params, modelling_params_type, |
| tbl_train, col_random_id, temp_param_explored, |
| explore_value, None, tbl_output_model) |
| |
| # validation |
| __cv_funcall_general( |
| predict_func, predict_params, predict_params_type, |
| tbl_valid, col_random_id, temp_param_explored, |
| explore_value, tbl_output_model, tbl_output_pred) |
| |
| # measure the performance metric |
| __cv_funcall_general( |
| metric_func, metric_params, metric_params_type, |
| tbl_valid, col_random_id, temp_param_explored, |
| explore_value, tbl_output_pred, tbl_output_error) |
| |
| # accumulate the measured metric result |
| if not output_created: |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {tbl_accum_error}; |
| CREATE TEMP TABLE {tbl_accum_error} as |
| SELECT |
| ({explore_value}){explore_type_str} as {temp_param_explored}, |
| {tbl_output_error}.* |
| FROM {tbl_output_error} |
| {use_fold} |
| """.format(**locals())) |
| output_created = True |
| else: |
| plpy.execute(""" |
| INSERT INTO {tbl_accum_error} |
| SELECT |
| ({explore_value}){explore_type_str} as {temp_param_explored}, |
| {tbl_output_error}.* |
| FROM {tbl_output_error} |
| {use_fold} |
| """.format(**locals())) |
| return output_created |
| # ---------------------------------------------------------------------- |
| |
| |
| # perform cross validation |
| def cross_validation_general( |
| schema_madlib, |
| modelling_func, modelling_params, modelling_params_type, |
| param_explored, explore_values, |
| predict_func, predict_params, predict_params_type, |
| metric_func, metric_params, metric_params_type, |
| data_tbl, data_id, id_is_random, |
| validation_result, data_cols, n_folds, **kwargs): |
| """ |
| A general framework that runs cross-validation for modules that are |
| consistent with certain SQL API. |
| |
| See dev-doc for details. |
| """ |
| with MinWarning("warning"): |
| if not data_cols: |
| data_cols = get_cols(data_tbl, schema_madlib) |
| |
| n_rows = _validate_cv_args(**locals()) |
| |
| explore_type = __cv_param_type_explored(modelling_params, |
| modelling_params_type, |
| param_explored) |
| explore_type_str = "" if not explore_type else "::" + str(explore_type) |
| |
| # all temporary names |
| tbl_all_data = unique_string(desp='tbl_all_data') |
| tbl_train = unique_string(desp='tbl_train') |
| tbl_valid = unique_string(desp='tbl_valid') |
| col_random_id = unique_string(desp='col_random_id') |
| tbl_random_id = unique_string(desp='tbl_random_id') |
| tbl_output_model = "pg_temp." + unique_string(desp='output_model') |
| tbl_output_pred = "pg_temp." + unique_string(desp='output_pred') |
| tbl_output_error = "pg_temp." + unique_string(desp='output_error') |
| tbl_accum_error = unique_string(desp='accum_error') |
| |
| tbl_used, col_random_id = _create_data_tbl_id(**locals()) |
| |
| # try to be as general as possible |
| # validation using each explore_value |
| if not explore_values: |
| # if no parameter needs to be explored |
| # just do the data folding |
| temp_param_explored = unique_string() |
| temp_explore_values = [-1] |
| data_folding_only = True |
| else: |
| temp_param_explored = param_explored |
| temp_explore_values = explore_values |
| data_folding_only = False |
| |
| output_created = False |
| modelling_params_orig = modelling_params |
| for k in range(n_folds): |
| # split data into train and validation parts |
| if not data_id or id_is_random: |
| __cv_split_data_using_id_col(tbl_used, data_cols, col_random_id, |
| n_rows, tbl_train, tbl_valid, |
| n_folds, k + 1) |
| else: |
| __cv_split_data_using_id_tbl(tbl_used, data_cols, tbl_random_id, |
| col_random_id, data_id, n_rows, |
| tbl_train, tbl_valid, n_folds, k + 1) |
| |
| for explore_value in temp_explore_values: |
| modelling_params = modelling_params_orig[:] |
| _replace_explore(modelling_params, param_explored, explore_value) |
| output_created = _one_step_cv(**locals()) |
| |
| # compute the averages and standard deviations for all measured metrics (not necessary one) |
| __cv_summarize_result(tbl_accum_error, validation_result, |
| temp_param_explored, data_folding_only, |
| schema_madlib) |
| |
| # clean up the temporary tables |
| plpy.execute(""" |
| DROP TABLE IF EXISTS grp_key_to_cp; |
| DROP TABLE IF EXISTS {tbl_all_data}; |
| DROP TABLE IF EXISTS {tbl_train}; |
| DROP TABLE IF EXISTS {tbl_valid}; |
| DROP TABLE IF EXISTS {tbl_random_id}; |
| DROP TABLE IF EXISTS {tbl_output_model}; |
| DROP TABLE IF EXISTS {tbl_output_pred}; |
| DROP TABLE IF EXISTS {tbl_output_error}; |
| DROP TABLE IF EXISTS {tbl_accum_error}; |
| """.format(**locals())) |
| return None |
| # ------------------------------------------------------------------------ |
| |
| # XXX Currently, this function is used only by the decision tree, |
| # which appends the outputs from different cv folds to a single |
| # table to reduce catalog changes and thus improve the performance. |
| |
| |
| def cross_validation_grouping_w_params( |
| schema_madlib, |
| modelling_func, modelling_params, modelling_params_type, |
| predict_func, predict_params, predict_params_type, |
| metric_func, metric_params, metric_params_type, |
| group_to_param_list_table, param_list_name, grouping_cols, |
| data_tbl, data_id, id_is_random, validation_result, param_explored, |
| data_cols, n_folds, **kwargs): |
| """ |
| A general framework that runs cross-validation for modules that are |
| consistent with certain SQL API. |
| |
| See dev-doc for details. |
| """ |
| with MinWarning("warning"): |
| if not data_cols: |
| data_cols = get_cols(data_tbl, schema_madlib) |
| n_rows = _validate_cv_args(**locals()) |
| |
| explore_type_str = "::INTEGER" |
| |
| # all temporary names |
| tbl_all_data = unique_string() |
| tbl_train = unique_string() |
| tbl_valid = unique_string() |
| col_random_id = unique_string() |
| tbl_random_id = unique_string() |
| tbl_output_model = "pg_temp." + unique_string() |
| tbl_output_pred = "pg_temp." + unique_string() |
| tbl_output_error = "pg_temp." + unique_string() |
| tbl_accum_error = unique_string() |
| grp_to_param_tbl = unique_string() |
| temp_param_explored = unique_string() |
| |
| tbl_used, col_random_id = _create_data_tbl_id(**locals()) |
| |
| output_created = False |
| |
| grouping_col_str = "" if not grouping_cols else grouping_cols + ',' |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(grp_to_param_tbl)) |
| plpy.execute("""CREATE TEMP TABLE {tbl} AS |
| SELECT {grouping_col_str} |
| {param_list_name}[1] as explore_value |
| FROM {group_to_param_list_table} |
| LIMIT 0 |
| """.format(tbl=grp_to_param_tbl, |
| grouping_col_str=grouping_col_str, |
| param_list_name=param_list_name, |
| group_to_param_list_table=group_to_param_list_table)) |
| |
| _replace_explore(modelling_params, unique_string(), grp_to_param_tbl) |
| # get the maximum size of the param list among all groups |
| max_len = plpy.execute(""" |
| SELECT max(array_upper({0}, 1)) as max_len |
| FROM {1} |
| """.format(param_list_name, group_to_param_list_table))[0]["max_len"] |
| |
| for k in range(n_folds): |
| # split data into train and validation parts |
| if not data_id or id_is_random: |
| __cv_split_data_using_id_col(tbl_used, data_cols, col_random_id, |
| n_rows, tbl_train, tbl_valid, |
| n_folds, k + 1) |
| else: |
| __cv_split_data_using_id_tbl(tbl_used, data_cols, tbl_random_id, |
| col_random_id, data_id, n_rows, |
| tbl_train, tbl_valid, n_folds, k + 1) |
| |
| for explore_value in range(1, max_len + 1): |
| plpy.execute("TRUNCATE TABLE {0}".format(grp_to_param_tbl)) |
| query = """ |
| INSERT INTO {grp_to_param_tbl} |
| SELECT {grouping_col_str} |
| (CASE WHEN {explore_value} <= array_upper({param_list_name}, 1) |
| THEN {param_list_name}[{explore_value}] |
| ELSE {param_list_name}[array_upper({param_list_name}, 1)] |
| END) as explore_value |
| FROM {group_to_param_list_table} |
| """.format(**locals()) |
| plpy.execute(query) |
| |
| # XXX Decision tree uses existing tables to store the outputs |
| # from all cv folds. |
| use_existing_tables = True |
| # XXX To reduce the number of truncations on the existing tables |
| # including the model table and error table, we just append new |
| # results to them together with the fold ID. |
| # XXX The prediction table is to big and we have to use table truncation |
| # to speed up the search in the prediction table. |
| append_k = True # k is fold ID to distinguish different sets of results. |
| |
| output_created = _one_step_cv(**locals()) |
| |
| # compute the averages and standard deviations of cv_error for each group and explore value |
| group_using_str = ('' if not grouping_cols |
| else ("USING (" + grouping_cols + ")")) |
| sql_create_validation_result = """ |
| CREATE TABLE {validation_result} AS |
| SELECT |
| {grouping_col_str} |
| {param_list_name}[{temp_param_explored}] AS {param_explored}, |
| avg(cv_error) AS cv_error_avg, |
| stddev(cv_error) AS cv_error_stddev |
| FROM |
| {tbl_accum_error} |
| NATURAL JOIN |
| {group_to_param_list_table} |
| WHERE {temp_param_explored} <= array_upper({param_list_name}, 1) |
| GROUP BY {grouping_col_str} {temp_param_explored}, {param_list_name} |
| """.format(**locals()) |
| plpy.execute(sql_create_validation_result) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {grp_to_param_tbl}; |
| DROP TABLE IF EXISTS {tbl_all_data}; |
| DROP TABLE IF EXISTS {tbl_train}; |
| DROP TABLE IF EXISTS {tbl_valid}; |
| DROP TABLE IF EXISTS {tbl_random_id}; |
| DROP TABLE IF EXISTS {tbl_output_model}; |
| DROP TABLE IF EXISTS {tbl_output_pred}; |
| DROP TABLE IF EXISTS {tbl_output_error}; |
| DROP TABLE IF EXISTS {tbl_accum_error}; |
| """.format(**locals())) |
| return None |
| # ---------------------------------------------------------------------- |
| |
| |
| def cv_linregr_train(schema_madlib, tbl_source, col_ind_var, |
| col_dep_var, tbl_result, **kwargs): |
| """ |
| A wrapper for linear regression to be used in general CV |
| """ |
| plpy.execute(""" |
| create table {tbl_result} as |
| select ({schema_madlib}.linregr({col_dep_var}, {col_ind_var})).* |
| from {tbl_source} |
| """.format(schema_madlib=schema_madlib, |
| tbl_source=tbl_source, |
| tbl_result=tbl_result, |
| col_ind_var=col_ind_var, |
| col_dep_var=col_dep_var)) |
| return None |
| # ------------------------------------------------------------------------ |
| |
| |
| def cv_linregr_predict(schema_madlib, tbl_model, tbl_newdata, col_ind_var, |
| col_id, tbl_predict, **kwargs): |
| """ |
| A wrapper function for linear prediction for CV |
| """ |
| plpy.execute(""" |
| create table {tbl_predict} as |
| select |
| {col_id}, |
| {schema_madlib}.linregr_predict( |
| coef, |
| {col_ind_var}) as prediction |
| from |
| {tbl_model}, {tbl_newdata} |
| """.format(**locals())) |
| # ------------------------------------------------------------------------ |
| |
| |
| def cv_logregr_predict(schema_madlib, tbl_model, tbl_newdata, col_ind_var, |
| col_id, tbl_predict, **kwargs): |
| """ |
| A wrapper function for logistic prediction for CV |
| """ |
| plpy.execute(""" |
| create table {tbl_predict} as |
| select |
| {col_id}, |
| {schema_madlib}.logregr_predict( |
| coef, |
| {col_ind_var}) as prediction |
| from |
| {tbl_model}, {tbl_newdata} |
| """.format(**locals())) |
| # ------------------------------------------------------------------------ |
| |
| |
| def cv_logregr_accuracy(schema_madlib, tbl_predict, tbl_source, col_id, |
| col_dep_var, tbl_accuracy, **kwargs): |
| """ |
| A wrapper function for logistic accuracy for CV |
| """ |
| plpy.execute(""" |
| create table {tbl_accuracy} as |
| select |
| avg(case when |
| {tbl_predict}.prediction = {tbl_source}.{col_dep_var} |
| then 1 |
| else 0 |
| end) as accuracy |
| from |
| {tbl_predict}, {tbl_source} |
| where {tbl_predict}.{col_id} = {tbl_source}.{col_id} |
| """.format(**locals())) |