| |
| import math |
| import plpy |
| from utilities.utilities import __mad_version |
| |
| ## ======================================================================== |
| |
| version_wrapper = __mad_version() |
| mad_vec = version_wrapper.select_vecfunc() |
| ## ======================================================================== |
| |
| |
| def __cv_produce_col_name_string(tbl, cols, exclude_cols=None): |
| """ |
| Given an array of strings, pick out the column names and form a single |
| string, so that we could select only the necessary data when copying is |
| inevitable. |
| |
| @param tbl Name of the table that contains the columns |
| @param cols A string list of column names |
| """ |
| if not exclude_cols: |
| exclude_cols = [] |
| rst = [tbl + "." + col for col in cols if col not in exclude_cols] |
| return ','.join(rst) |
| ## ======================================================================== |
| |
| |
| def __cv_copy_data_with_id(rel_origin, col_data, rel_copied, random_id): |
| """ |
| If the user does not provide a ID column, the data table has to be |
| copied and at the same time create a random ID column with it. |
| |
| @param rel_origin Original table name, a string |
| @param col_data A string list of columns that contain needed data |
| @param rel_copied Name of the table that copies the data from the original table |
| @param random_id Column name for random unqiue ID in the newly created copied table |
| """ |
| # We want to select only the columns that will be used in the computation. |
| col_string = __cv_produce_col_name_string(rel_origin, col_data) |
| __cv_copy_data_with_id_compute(rel_origin, col_string, rel_copied, random_id) |
| return None |
| ## ======================================================================== |
| |
| |
| def __cv_copy_data_with_id_compute(rel_origin, col_string, rel_copied, random_id): |
| plpy.execute(""" |
| select setseed(0.5); |
| drop table if exists {rel_copied}; |
| create temp table {rel_copied} as |
| select |
| row_number() over (order by random()) as {random_id}, |
| {col_string} |
| from {rel_origin} |
| """.format(rel_origin=rel_origin, rel_copied=rel_copied, |
| random_id=random_id, col_string=col_string)) |
| return None |
| ## ======================================================================== |
| |
| |
| def __cv_summarize_result(tbl_accum_error, tbl_result, param_explored, |
| data_folding_only, schema_madlib): |
| """ |
| Summarize the result. For each column in the metric measured in cross validation, |
| compute the average and standard deviation for the metrics. For ridge and lasso, |
| there is only one error column, but for other modules, there might be multiple |
| metric values. "_avg" and "_stddev" will be appended to each generated column. |
| |
| @param tbl_accum_error Name of the table that accumulates all measured metric in CV |
| @param tbl_result Name of the table that will store the summarized result |
| @param param_explored Name of the parameter that CV runs through, a string |
| """ |
| agg = version_wrapper.select_array_agg(schema_madlib) |
| cols = plpy.execute(""" |
| select {agg}(column_name::varchar) as col |
| from information_schema.columns |
| where table_name = '{tbl_accum_error}' |
| """.format(tbl_accum_error=tbl_accum_error, |
| agg=agg))[0]["col"] |
| cols = mad_vec(cols) |
| |
| kwargs = dict(tbl_accum_error=tbl_accum_error, |
| tbl_result=tbl_result, |
| param_explored=param_explored) |
| |
| ecols = [] |
| for i in range(len(cols)): |
| if cols[i] != param_explored: |
| ecols.append(cols[i]) |
| col_string = "{param_explored}, ".format(**kwargs) |
| for i in range(len(ecols)): |
| col_string += "avg({col}) as {col}_avg, stddev({col}) as {col}_stddev".format(col=ecols[i]) |
| if i != len(ecols) - 1: |
| col_string += ", " |
| |
| plpy.execute(""" |
| drop table if exists {tbl_result}; |
| create table {tbl_result} as |
| select {col_string} |
| from {tbl_accum_error} |
| group by {param_explored} |
| order by {param_explored} |
| """.format(col_string=col_string, **kwargs)) |
| |
| if data_folding_only: |
| plpy.execute("alter table " + tbl_result + " drop column " + param_explored) |
| return None |
| ## ======================================================================== |
| |
| |
| def __cv_generate_random_id(tbl_origin, col_id, tbl_random_id, random_id): |
| """ |
| Create an random ID column for a given data table. |
| |
| If the user provides an ID column, which can uniquely identify the rows, |
| create a table which maps the row ID to a random integer, which is |
| then used as the ID when splitting data. |
| |
| If the ID provided by user is already random, then there is no need to |
| call this function. |
| |
| @param tbl_origin Name of the original data table |
| @param col_id Name of the original unique ID column |
| @param tbl_random_id Name of the table that maps the original non-random ID to |
| a random ID. It has two columns, one for the original ID |
| and the other for the newly created random ID. |
| @param random_id Name of the column that contains the newly created random ID |
| @param origin_id Name of the column that contains the origin ID |
| """ |
| plpy.execute(""" |
| drop table if exists {tbl_random_id}; |
| create temp table {tbl_random_id} as |
| select |
| row_number() over (order by random()) as {random_id}, |
| {col_id} |
| from {tbl_origin} |
| """.format(tbl_origin=tbl_origin, |
| col_id=col_id, |
| tbl_random_id=tbl_random_id, |
| random_id=random_id)) |
| return None |
| ## ======================================================================== |
| |
| |
| def __cv_validation_rows(row_num, n_folds, which_fold): |
| """ |
| Compute the start and ending rows of each validation fold. |
| |
| A validation data set would correspond to the asymmetric range [start_row, end_row) |
| |
| Since the ID column is already random, a consecutive part of the ID is |
| enough to extract a good sample of validation data. |
| |
| To ensure even-sized folds, the fold sizes are computed as: |
| Say there are N instances and k folds, then |
| r = N mod k (the remainder of N divided by k) |
| is the number of surplus records. |
| The first r folds will have floor(N/k) + 1 records, |
| the other ones just floor(N/k) |
| """ |
| r = row_num % n_folds |
| fold_size = int(math.floor(row_num * 1.0 / n_folds)) |
| if which_fold > r: |
| start_row = r * (fold_size + 1) + (which_fold - r - 1) * fold_size + 1 |
| end_row = start_row + fold_size |
| else: |
| start_row = (which_fold - 1) * (fold_size + 1) + 1 |
| end_row = start_row + (fold_size + 1) |
| |
| return (start_row, end_row) |
| ## ======================================================================== |
| |
| |
| def __cv_split_data_using_id_col(rel_source, col_data, col_id, row_num, |
| rel_train, rel_valid, n_folds, which_fold): |
| """ |
| A random ID column exists (originally exists or was created during copying), |
| split the data into training and validation. |
| |
| @param rel_source Name of data source table |
| @param col_data A string list of data columns |
| @param col_id Name of the unique ID column |
| @param row_num Total number of rows |
| @param rel_train Name of training data table |
| @param rel_valid Name of validation data table |
| @param n_folds How many fold cross validation |
| @param which_fold Which fold will be used as validation part? |
| """ |
| col_string = __cv_produce_col_name_string(rel_source, col_data, [col_id]) |
| __cv_split_data_using_id_col_compute(rel_source, col_string, col_id, row_num, |
| rel_train, rel_valid, n_folds, which_fold) |
| return None |
| # ======================================================================== |
| |
| |
| def __cv_split_data_using_id_col_compute(rel_source, col_string, col_id, row_num, |
| rel_train, rel_valid, n_folds, which_fold): |
| (start_row, end_row) = __cv_validation_rows(row_num, n_folds, which_fold) |
| kwargs = dict(rel_train=rel_train, rel_source=rel_source, |
| col_id=col_id, start_row=start_row, |
| rel_valid=rel_valid, |
| end_row=end_row, col_string=col_string) |
| # Extract the training part of data, |
| # which corresponds to rows outside of [start_row, end_row). |
| # Extract the validation part of data, |
| # which corresponds to rows inside of [start_row, end_row). |
| sql = """ |
| drop table if exists {rel_train}; |
| create temp table {rel_train} as |
| select {col_id}, {col_string} from {rel_source} |
| where {col_id} < {start_row} |
| or {col_id} >= {end_row}; |
| |
| drop table if exists {rel_valid}; |
| create temp table {rel_valid} as |
| select {col_id}, {col_string} from {rel_source} |
| where {col_id} >= {start_row} |
| and {col_id} < {end_row} |
| """.format(**kwargs) |
| plpy.execute(sql) |
| return None |
| ## ======================================================================== |
| |
| |
| def __cv_split_data_using_id_tbl(rel_origin, col_data, rel_random_id, |
| random_id, origin_id, row_num, rel_train, |
| rel_valid, n_folds, which_fold): |
| """ |
| Split the data table using a random ID mapping table |
| |
| A unique ID column exists in the original table, but it is not randomly assigned. |
| Thus a table that maps this non-random ID to a real random ID has been created by |
| __cv_generate_random_id. |
| |
| @param rel_origin Name of the original data table |
| @param col_data A string list of data columns |
| @param rel_random_id Name of the random ID mapping table |
| @param random_id Name of random ID column in the table rel_random_id |
| @param origin_id Name of the original non-random ID column in rel_origin and rel_random_id |
| @param row_num Total number of rows |
| @param rel_train Name of training data table |
| @param rel_valid Name of validation data table |
| @param n_folds How many fold cross validation |
| @param which_fold Which fold will be used as validation part? |
| """ |
| col_string = __cv_produce_col_name_string(rel_origin, col_data) |
| __cv_split_data_using_id_tbl_compute(rel_origin, col_string, rel_random_id, |
| random_id, origin_id, row_num, rel_train, |
| rel_valid, n_folds, which_fold) |
| return None |
| ## ======================================================================== |
| |
| |
| def __cv_split_data_using_id_tbl_compute(rel_origin, col_string, rel_random_id, |
| random_id, origin_id, row_num, rel_train, |
| rel_valid, n_folds, which_fold): |
| (start_row, end_row) = __cv_validation_rows(row_num, n_folds, which_fold) |
| # Extract the training part of data, |
| # which corresponds to rows outside of [start_row, end_row). |
| # Extract the validation part of data, |
| # which corresponds to rows inside of [start_row, end_row). |
| sql = """ |
| drop table if exists {rel_train}; |
| create temp table {rel_train} as |
| select {rel_random_id}.{random_id}, {col_string} |
| from {rel_origin}, {rel_random_id} |
| where |
| {rel_origin}.{origin_id} = {rel_random_id}.{origin_id} |
| and ( |
| {rel_random_id}.{random_id} < {start_row} |
| or |
| {rel_random_id}.{random_id} >= {end_row}); |
| |
| drop table if exists {rel_valid}; |
| create temp table {rel_valid} as |
| select {rel_random_id}.{random_id}, {col_string} |
| from {rel_origin}, {rel_random_id} |
| where |
| {rel_origin}.{origin_id} = {rel_random_id}.{origin_id} |
| and ( |
| {rel_random_id}.{random_id} >= {start_row} |
| and |
| {rel_random_id}.{random_id} < {end_row}) |
| """.format(**locals()) |
| plpy.execute(sql) |
| return None |