| """ |
| @file pca_project.py_in |
| |
| @namespace pca |
| """ |
| |
| import plpy |
| import time |
| |
| from linalg.matrix_ops import cast_dense_input_table_to_correct_columns |
| from linalg.matrix_ops import create_temp_sparse_matrix_table_with_dims |
| from linalg.matrix_ops import get_dims |
| from linalg.matrix_ops import validate_dense |
| from linalg.matrix_ops import validate_sparse |
| from utilities.utilities import __mad_version |
| from utilities.utilities import unique_string |
| from utilities.utilities import _assert |
| from utilities.utilities import _array_to_string |
| from utilities.validate_args import columns_exist_in_table |
| from utilities.validate_args import table_exists |
| from utilities.utilities import add_postfix |
| from utilities.validate_args import get_cols, get_cols_and_types |
| from utilities.control import MinWarning |
| |
| |
| version_wrapper = __mad_version() |
| string_to_array = version_wrapper.select_vecfunc() |
| array_to_string = version_wrapper.select_vec_return() |
| ZERO_THRESHOLD = 1e-6 |
| |
| |
| # Dense PCA project help function |
| def pca_project_help(schema_madlib, usage_string=None, **kwargs): |
| """ |
| Given a usage string, give out function usage information. |
| """ |
| if usage_string is not None and \ |
| usage_string.lower() in ("usage", "help", "?"): |
| return """ |
| ---------------------------------------------------------------- |
| Usage |
| ---------------------------------------------------------------- |
| SELECT {schema_madlib}.pca_project ( |
| 'tbl_source', -- Data table |
| 'pc_table', -- Table with principal componenents |
| (obtained as output from pca_train) |
| 'tbl_result', -- Result table |
| 'row_id', -- Name of the column containing the row_id |
| -- Optional Parameters |
| ---------------------------------------------------------------- |
| 'tbl_residual', -- Residual table (Default: NULL) |
| 'tbl_result_summary', -- Result summary table (Default : NULL) |
| ); |
| |
| Note that if the principal components in pc_table were learnt using |
| grouping_cols in {schema_madlib}.pca_train(), the tbl_source used |
| here must also have those grouping columns. This will fail otherwise. |
| |
| Output Tables |
| -------------------------------------------------------------------- |
| The output is divided into three tables (two of which are optional) |
| |
| -------------------------------------------------------------------- |
| The output table ('tbl_result' above) encodes a dense matrix |
| with the projection onto the principal components. The matrix contains |
| the following columns: |
| |
| 'row_id' INTEGER, -- Row id of the output matrix |
| 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix |
| grouping_col -- The grouping columns present in the 'pc_table', if any |
| |
| -------------------------------------------------------------------- |
| The residual table ('tbl_residual' above) encodes a dense residual |
| matrix which has the following columns |
| |
| 'row_id' INTEGER, -- Row id of the output matrix |
| 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix |
| grouping_col -- The grouping columns present in the 'pc_table', if any |
| |
| -------------------------------------------------------------------- |
| The result summary table ('tbl_result_summary' above) has the following columns |
| |
| 'exec_time' INTEGER, -- Wall clock time (ms) of the function. |
| 'residual_norm' DOUBLE PRECISION, -- Absolute error of the residuals |
| 'relative_residual_norm' DOUBLE PRECISION, -- Relative error of the residuals |
| grouping_col -- The grouping columns present in the 'pc_table', if any |
| ---------------------------------------------------------------- |
| """.format(schema_madlib=schema_madlib) |
| |
| else: |
| return """ |
| ---------------------------------------------------------------- |
| Summary: PCA Projection |
| ---------------------------------------------------------------- |
| PCA Projection: Projects a dataset to an already trained |
| space of principal components. |
| -- |
| For function usage information, run |
| SELECT {schema_madlib}.pca_project('usage'); |
| -- |
| """.format(schema_madlib=schema_madlib) |
| |
| |
| # Sparse PCA help function |
| # ------------------------------------------------------------------------ |
| def pca_sparse_project_help(schema_madlib, usage_string=None, **kwargs): |
| """ |
| Given a usage string, give out function usage information. |
| """ |
| if usage_string is not None and \ |
| usage_string.lower() in ("usage", "help", "?"): |
| return """ |
| ---------------------------------------------------------------- |
| Usage |
| ---------------------------------------------------------------- |
| SELECT {schema_madlib}.pca_sparse_project ( |
| 'tbl_source', -- Data table |
| 'pc_table', -- Table with principal componenents |
| (obtained as output from pca_train) |
| 'tbl_result', -- Result table |
| 'row_id', -- Name of the column containing the row_id |
| 'col_id', -- Name of the column containing the col_id |
| 'val_id', -- Name of the column containing the val_id |
| 'row_dim' -- Row dimension of the sparse matrix |
| 'col_dim' -- Column dimension of the sparse matrix |
| -- Optional Parameters |
| ---------------------------------------------------------------- |
| 'tbl_residual', -- Residual table (Default: NULL) |
| 'tbl_result_summary', -- Result summary table (Default : NULL) |
| ); |
| |
| Note that if the principal components in 'pc_table' were learnt using |
| grouping_cols in {schema_madlib}.pca_train(), the tbl_source used |
| here must also have those grouping columns. This will fail otherwise. |
| |
| Output Tables |
| ---------------------------------------------------------------- |
| The output is divided into three tables (two of which are optional) |
| |
| ----------------------------------------------------------------------------------------- |
| The output table ('tbl_result' above) encodes a dense matrix |
| with the projection onto the principal components. The matrix contains |
| the following columns: |
| |
| 'row_id' INTEGER, -- Row id of the output matrix |
| 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix |
| grouping_col -- The grouping columns present in the 'pc_table', if any |
| |
| ----------------------------------------------------------------------------------------- |
| The residual table ('tbl_residual' above) encodes a dense residual |
| matrix which has the following columns |
| |
| 'row_id' INTEGER, -- Row id of the output matrix |
| 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix |
| grouping_col -- The grouping columns present in the 'pc_table', if any |
| |
| ----------------------------------------------------------------------------------------- |
| The result summary table ('tbl_result_summary' above) has the following columns |
| |
| 'exec_time' INTEGER, -- Wall clock time (ms) of the function. |
| 'residual_norm' DOUBLE PRECISION, -- Absolute error of the residuals |
| 'relative_residual_norm' DOUBLE PRECISION, -- Relative error of the residuals |
| grouping_col -- The grouping columns present in the 'pc_table', if any |
| ---------------------------------------------------------------- |
| """.format(schema_madlib=schema_madlib) |
| else: |
| return """ |
| ---------------------------------------------------------------- |
| Summary: PCA Projection |
| ---------------------------------------------------------------- |
| PCA Projection: Projects a dataset to an already trained |
| space of principal components. |
| -- |
| For function usage information, run: |
| SELECT {schema_madlib}.pca_sparse_project('usage'); |
| -- |
| """.format(schema_madlib=schema_madlib) |
| |
| def _validate_args(schema_madlib, |
| source_table, |
| pc_table, |
| out_table, |
| row_id, |
| col_id=None, |
| val_id=None, |
| row_dim=None, |
| col_dim=None, |
| residual_table=None, |
| result_summary_table=None): |
| """ |
| Validates all arguments passed to the PCA function |
| |
| Args: |
| @param schema_madlib Name of MADlib schema |
| @param source_table Name of the input table (containing data to project) |
| @param pc_table Name of table with principal components (output by the training function) |
| @param out_table Name of output table to store projection result |
| @param row_id Name of the row_id column |
| @param col_id Name of the col_id column (only for sparse matrices) |
| @param val_id Name of the val_id column (only for sparse matrices) |
| @param row_dim Number of rows in input matrix (only for sparse matrices) |
| @param col_dim Number of columns in input matrix (only for sparse matrices) |
| @param residual_table Name of the residual table (to store error in projection) |
| @param result_summary_table Name of result summary table |
| Returns: |
| None |
| Throws: |
| plpy.error if any argument is invalid |
| """ |
| _assert(source_table is not None and table_exists(source_table), |
| "PCA error: Source data table does not exist!") |
| |
| _assert(pc_table is not None and table_exists(pc_table), |
| "PCA error: Principal comp. table does not exist!") |
| |
| _assert(table_exists(add_postfix(pc_table, "_mean")), |
| "PCA error: Source data table column means does not exist!") |
| |
| # Make sure that the output table does not exist |
| # Also check that the output table is not null |
| _assert(out_table and out_table.strip(), |
| "PCA error: Invalid output table name.") |
| _assert(not table_exists(out_table, only_first_schema=True), |
| "PCA error: Output table {0} already exists!".format(str(out_table))) |
| |
| # Check that the result summary table is not empty |
| if result_summary_table is not None: |
| _assert(result_summary_table.strip(), |
| "PCA error: Invalid result summary table name!") |
| _assert(not table_exists(result_summary_table, only_first_schema=True), |
| "PCA error: Result summary table {0} already exists!". |
| format(result_summary_table)) |
| |
| # Check that the result summary table is not empty |
| if residual_table is not None: |
| _assert(residual_table.strip(), |
| "PCA error: Invalid residual table name!") |
| _assert(not table_exists(residual_table, only_first_schema=True), |
| "PCA error: Residual table {0} already exists!". |
| format(residual_table)) |
| |
| # Check that the row_id exists |
| _assert(columns_exist_in_table(source_table, [row_id], schema_madlib), |
| "PCA error: {1} column does not exist in {0}!". |
| format(source_table, "NULL" if row_id is None else row_id)) |
| |
| # For sparse inputs: Check that the row_id exists |
| if col_id or val_id: |
| _assert(col_id, |
| "PCA error: Column ID should be provided if value ID is input!") |
| _assert(val_id, |
| "PCA error: Value ID should be provided if column ID is input!") |
| _assert(columns_exist_in_table(source_table, [col_id], schema_madlib), |
| "PCA error: {1} column does not exist in {0}!". |
| format(source_table, col_id)) |
| _assert(columns_exist_in_table(source_table, [val_id], schema_madlib), |
| "PCA error: {1} column does not exist in {0}!". |
| format(source_table, val_id)) |
| _assert(row_dim > 0 and col_dim > 0, |
| "PCA error: row_dim/col_dim should be positive integer") |
| |
| # ------------------------------------------------------------------------ |
| |
| |
| def pca_sparse_project(schema_madlib, |
| source_table, |
| pc_table, |
| out_table, |
| row_id, |
| col_id, |
| val_id, |
| row_dim, |
| col_dim, |
| residual_table, |
| result_summary_table, |
| **kwargs): |
| """ |
| PCA projection of the matrix in source_table. |
| |
| This function is the specific call for pca projection. It projects |
| the input matrix into the principal components. |
| |
| Args: |
| @param schema_madlib Name of MADlib schema |
| @param source_table Name of the input table (containing data to project) |
| @param pc_table Name of table with principal components (output by the training function) |
| @param out_table Name of output table to store projection result |
| @param row_id Name of the row_id column |
| @param col_id Name of the col_id column |
| @param val_id Name of the val_id column |
| @param row_dim Number of rows in input matrix |
| @param col_dim Number of columns in input matrix |
| @param residual_table Name of the residual table (to store error in projection) |
| @param result_summary_table Name of result summary table |
| Returns: |
| None |
| Throws: |
| plpy.error if any argument is invalid |
| """ |
| pca_project_wrap(schema_madlib, source_table, pc_table, out_table, |
| row_id, residual_table, result_summary_table, |
| True, col_id, val_id, row_dim, col_dim) |
| |
| |
| # ------------------------------------------------------------------------ |
| def pca_project(schema_madlib, |
| source_table, |
| pc_table, |
| out_table, |
| row_id, |
| residual_table, |
| result_summary_table, |
| **kwargs): |
| """ |
| PCA projection of the matrix in source_table. |
| |
| This function is the specific call for pca projection. It projects |
| the input matrix into the principal components. |
| |
| Args: |
| @param schema_madlib Name of MADlib schema |
| @param source_table Name of the input table (containing data to project) |
| @param pc_table Name of table with principal components (output by the training function) |
| @param out_table Name of output table to store projection result |
| @param row_id Name of the row_id column |
| @param residual_table Name of the residual table (to store error in projection) |
| @param result_summary_table Name of result summary table |
| Returns: |
| None |
| Throws: |
| plpy.error if any argument is invalid |
| """ |
| pca_project_wrap(schema_madlib, source_table, pc_table, out_table, |
| row_id, residual_table, result_summary_table) |
| |
| |
| def pca_project_wrap(schema_madlib, source_table, pc_table, out_table, |
| row_id, residual_table, |
| result_summary_table, is_sparse=False, |
| col_id=None, val_id=None, row_dim=None, |
| col_dim=None, **kwargs): |
| """ |
| This wrapper was added to support grouping columns. This |
| function does the necessary pre-processing for handling |
| grouping_cols, if set. It then constructs a single query that |
| includes a separate "madlib.pca_project_wrap(...)" for each group. |
| """ |
| # Reset the message level to avoid random messages |
| old_msg_level = plpy.execute(""" |
| SELECT setting |
| FROM pg_settings |
| WHERE name='client_min_messages' |
| """)[0]['setting'] |
| plpy.execute('SET client_min_messages TO warning') |
| if is_sparse: |
| _validate_args(schema_madlib, source_table, pc_table, out_table, |
| row_id, col_id, val_id, row_dim, col_dim, residual_table, |
| result_summary_table) |
| else: |
| _validate_args(schema_madlib, source_table, pc_table, out_table, |
| row_id, None, None, None, None, |
| residual_table, result_summary_table) |
| # If we add new columns to the pca_train output table in the future, they should |
| # be included in this list: |
| pc_table_model_cols = ['row_id', 'principal_components', 'std_dev', 'proportion'] |
| grouping_cols_list = [col for col in get_cols(pc_table) if col not in pc_table_model_cols] |
| grouping_cols = '' |
| if grouping_cols_list: |
| grouping_cols = ', '.join(grouping_cols_list) |
| |
| other_columns_in_table = [col for col in get_cols(source_table) if col not in grouping_cols_list] |
| grouping_cols_clause = '' |
| if(grouping_cols): |
| # validate the grouping columns. We currently only support grouping_cols |
| # to be column names in the source_table, and not expressions! |
| _assert(columns_exist_in_table(source_table, grouping_cols_list, schema_madlib), |
| """PCA error: One or more grouping columns in {0} do not exist in {1}, but |
| the model in {2} was learnt with grouping!""".format(grouping_cols, |
| source_table, pc_table)) |
| distinct_grouping_values = plpy.execute(""" |
| SELECT DISTINCT {grouping_cols} FROM {source_table} |
| """.format(grouping_cols=grouping_cols, source_table=source_table)) |
| cols_names_types = get_cols_and_types(source_table) |
| grouping_cols_clause = ', ' + ', '.join([c_name+" "+c_type |
| for (c_name, c_type) in cols_names_types if c_name in grouping_cols_list]) |
| # Create all output tables |
| plpy.execute(""" |
| CREATE TABLE {0} ( |
| row_id INTEGER, |
| row_vec double precision[] |
| {1} |
| ) """.format(out_table, grouping_cols_clause)) |
| if result_summary_table: |
| plpy.execute( |
| """ |
| CREATE TABLE {0} ( |
| exec_time FLOAT8, |
| residual_norm FLOAT8, |
| relative_residual_norm FLOAT8 |
| {1} |
| ) """.format(result_summary_table, grouping_cols_clause)) |
| else: |
| result_summary_table = '' |
| if residual_table: |
| plpy.execute(""" |
| CREATE TABLE {0} ( |
| row_id INTEGER, |
| row_vec double precision[] |
| {1} |
| ) """.format(residual_table, grouping_cols_clause)) |
| if not residual_table: |
| residual_table = '' |
| |
| # declare variables whose values will be different for each group, if |
| # grouping_cols is specified |
| grouping_where_clause = '' |
| sparse_where_condition = '' |
| select_grouping_cols = '' |
| grouping_cols_values = '' |
| result_summary_table_temp = '' |
| other_columns_in_pc_table = [col for col in get_cols(pc_table) if col not in grouping_cols_list] |
| temp_pc_table_columns = ', '.join(other_columns_in_pc_table) |
| original_row_id = row_id |
| |
| other_columns_in_table.remove(row_id) |
| temp_source_table_columns = ','.join(other_columns_in_table) |
| |
| pca_union_call_list = [] |
| grp_id = 0 |
| if not is_sparse: |
| col_id = 'NULL' |
| val_id = 'NULL' |
| row_dim = 0 |
| col_dim = 0 |
| while True: |
| if grouping_cols: |
| grp_value_dict = distinct_grouping_values[grp_id] |
| where_conditions = ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()]) |
| sparse_where_condition = ' AND ' + where_conditions |
| grouping_where_clause = ' WHERE ' + where_conditions |
| select_grouping_cols = ', ' + ', '.join([str(value)+" AS "+key for (key, value) in grp_value_dict.items()]) |
| grouping_cols_values = ', ' + ', '.join([str(value) for (key, value) in grp_value_dict.items()]) |
| |
| pca_union_call_list.append(""" |
| {schema_madlib}._pca_project_union('{source_table}', '{pc_table}', '{out_table}', |
| '{row_id}', '{original_row_id}', '{grouping_cols}', |
| '{grouping_cols_clause}', '{residual_table}', '{result_summary_table}', |
| {grp_id}, '{grouping_where_clause}', '{sparse_where_condition}','{select_grouping_cols}', |
| '{grouping_cols_values}', '{temp_source_table_columns}', '{temp_pc_table_columns}', |
| {is_sparse}, '{col_id}', '{val_id}', {row_dim}, {col_dim}) |
| """.format(schema_madlib=schema_madlib, |
| source_table=source_table, pc_table=pc_table, |
| out_table=out_table, row_id=row_id, |
| original_row_id=original_row_id, |
| grouping_cols=grouping_cols, |
| grouping_cols_clause=grouping_cols_clause, |
| residual_table=residual_table, |
| result_summary_table=result_summary_table, |
| grp_id=grp_id, grouping_where_clause=grouping_where_clause, |
| sparse_where_condition=sparse_where_condition, |
| select_grouping_cols=select_grouping_cols, |
| grouping_cols_values=grouping_cols_values, |
| temp_source_table_columns=temp_source_table_columns, |
| temp_pc_table_columns=temp_pc_table_columns, is_sparse=is_sparse, |
| col_id=col_id, val_id=val_id, row_dim=row_dim, col_dim=col_dim)) |
| grp_id += 1 |
| if not grouping_cols_list or len(distinct_grouping_values) == grp_id: |
| break |
| # "SELECT <query_1>, <query_2>, <query_3>, ..." is expected to run each |
| # <query_i> in parallel. |
| pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list) |
| plpy.execute(pca_union_call) |
| plpy.execute("SET client_min_messages TO %s" % old_msg_level) |
| |
| def _pca_project_union(schema_madlib, source_table, pc_table, out_table, |
| row_id, original_row_id, grouping_cols, grouping_cols_clause, |
| residual_table, result_summary_table, grp_id, grouping_where_clause, |
| sparse_where_condition, select_grouping_cols, grouping_cols_values, |
| temp_source_table_columns, temp_pc_table_columns, is_sparse, col_id, |
| val_id, row_dim, col_dim, **kwargs): |
| """ |
| The pca_project is performed over each group, if any. |
| |
| Args: |
| @param schema_madlib -- madlib schema name |
| @param source_table -- Source table name (dense matrix) |
| @param pc_table -- Output table name for the principal components |
| @param out_table -- Output table name |
| @param row_id -- Column name for the ID for each row |
| @param original_row_id -- copy of the row_id originally passed |
| @param grouping_cols -- Comma-separated list of grouping columns (Default: NULL) |
| @param grouping_cols_clause -- Part of the SQL query to be used with grouping_cols |
| @param residual_table -- Residual table name |
| @param result_summary_table -- Table name to store summary of results (Default: NULL) |
| @param grp_id -- a place holder id for each group |
| @param grouping_where_clause -- WHERE clause using grouping_cols |
| @param select_grouping_cols -- SELECT clause using grouping_cols |
| @param grouping_cols_values -- distinct values of the grouping_cols |
| @param temp_source_table_columns -- SELECT caluse for creating temporary copy of the source_table |
| @param temp_pc_table_columns -- non grouping_cols of the source_table |
| @param is_sparse -- specifies if the PCA call is for sparse or dense matrices |
| @param col_id -- sparse representation based detail |
| @param val_id -- sparse representation based detail |
| @param row_dim -- sparse representation based detail |
| @param col_dim -- sparse representation based detail |
| |
| Returns: |
| None |
| """ |
| out_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id) |
| if grouping_cols: |
| pc_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id) |
| plpy.execute(""" |
| CREATE TABLE {pc_table_grouped} AS |
| SELECT {temp_pc_table_columns} |
| FROM {pc_table} |
| {grouping_where_clause} |
| """.format(pc_table_grouped=pc_table_grouped, |
| pc_table=pc_table, grouping_where_clause=grouping_where_clause, |
| temp_pc_table_columns=temp_pc_table_columns)) |
| else: |
| pc_table_grouped = pc_table |
| |
| t0 = time.time() # measure the starting time |
| # Step 1: Validate the input arguments |
| if is_sparse: |
| # Step 1.1: Create a copy of the sparse matrix and add row_dims and col_dims |
| # Warning: This changes the column names of the table |
| sparse_table_copy = "pg_temp." + unique_string() + "_sparse_table_copy" |
| create_temp_sparse_matrix_table_with_dims(source_table, sparse_table_copy, |
| row_id, col_id, val_id, |
| row_dim, col_dim, sparse_where_condition) |
| validate_sparse(sparse_table_copy, |
| {'row': row_id, 'col': col_id, 'val': val_id}, |
| check_col=False) |
| # Step 1.2: Densify the input matrix |
| x_dense = "pg_temp." + unique_string() + "_dense" |
| plpy.execute(""" |
| SELECT {schema_madlib}.matrix_densify( |
| '{sparse_table_copy}', 'row={row_id}, col={col_id}, val={val_id}', |
| '{x_dense}', 'row=row_id, col=col_id,val=row_vec') |
| """.format(schema_madlib=schema_madlib, |
| sparse_table_copy=sparse_table_copy, row_id=row_id, |
| col_id=col_id, val_id=val_id, x_dense=x_dense)) |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {0}; |
| """.format(sparse_table_copy)) |
| source_table_grouped = x_dense |
| else: |
| # For Dense matrix format only: |
| # We can now ignore the original row_id for all computations since we will |
| # create a new table with a row_id column that has not duplicates and ranges |
| # from 1 to number of rows in the group/table. This is to mainly support the |
| # grouping scneario where the row_id values might not range between 1 and |
| # number of rows in the group, for each group. Doing this also just extends |
| # this behavior for non-grouping scenarios too. If creating a new temp table |
| # that corrects the row_id column is not of much importance in non-grouping |
| # cases, we can avoid creating the temp table and save some computation time. |
| # But, at the moment, the code creates the temp table even for the non-grouping |
| # scenario. |
| # We don't need to do this for sparse representation because of the nature |
| # of its definition. |
| |
| # Preserve the mapping between new row_id created and the original row_id. This is |
| # required only for dense input format. |
| temp_row_id = "original_row_id" + unique_string() |
| row_id_map_table = "rowid" + unique_string() |
| plpy.execute(""" |
| CREATE TABLE {row_id_map_table} AS |
| SELECT |
| {source_table}.{original_row_id} AS {temp_row_id}, |
| {select_clause} |
| FROM {source_table} |
| {grouping_where_clause} |
| """.format(row_id_map_table=row_id_map_table, |
| original_row_id=original_row_id, |
| temp_row_id=temp_row_id, |
| source_table=source_table, |
| select_clause=""" ROW_NUMBER() OVER() AS row_id """, |
| grouping_where_clause=grouping_where_clause)) |
| |
| # Creation of this temp table is unnecessary if the scenario does not involve |
| # grouping, and/or, the input table had perfect values for the row_id column. |
| # This temp table will ensure pca works even when row_id of the source_table |
| # does not have serially increasing numbers starting from 1; |
| source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id) |
| plpy.execute(""" |
| CREATE TABLE {source_table_grouped} AS |
| SELECT {row_id_map_table}.row_id, {temp_source_table_columns} |
| FROM |
| ( |
| SELECT * |
| FROM {source_table} |
| {grouping_where_clause} |
| ) t1 |
| INNER JOIN {row_id_map_table} |
| ON {row_id_map_table}.{temp_row_id}=t1.{row_id} |
| """.format(source_table_grouped=source_table_grouped, |
| temp_row_id=temp_row_id, row_id_map_table=row_id_map_table, row_id=row_id, |
| source_table=source_table, grouping_where_clause=grouping_where_clause, |
| temp_source_table_columns=temp_source_table_columns)) |
| |
| row_id = 'row_id' |
| # Make sure that the table has row_id and row_vec |
| source_table_copy = "pg_temp." + unique_string() |
| need_new_column_names = cast_dense_input_table_to_correct_columns( |
| schema_madlib, source_table_grouped, source_table_copy, row_id) |
| |
| if(need_new_column_names): |
| source_table_grouped = source_table_copy |
| [row_dim, col_dim] = get_dims(source_table_grouped, |
| {'row': 'row_id', 'col': 'col_id', |
| 'val': 'row_vec'}) |
| validate_dense(source_table_grouped, |
| {'row': 'row_id', 'col': 'col_id', 'val': 'row_vec'}, |
| check_col=False, row_dim=row_dim) |
| |
| # Step 2: Compute the PCA Projection matrix |
| # The R code to perform this step is |
| # p <- princomp(mat) |
| # low_rank_representation <- mat %*% p$loadings[,1:k] |
| |
| # First normalize the data (Column means) |
| scaled_source_table = "pg_temp." + unique_string() + "_scaled_table" |
| x_std_str = _array_to_string([1] * col_dim) |
| |
| pc_table_mean = add_postfix(pc_table, "_mean") |
| plpy.execute( |
| """ |
| CREATE TABLE {scaled_source_table} |
| m4_ifdef(`__POSTGRESQL__', `', `WITH (appendonly=true)') |
| AS |
| SELECT |
| row_id, |
| ({schema_madlib}.utils_normalize_data( |
| row_vec, |
| (select column_mean from {pc_table_mean} |
| {grouping_where_clause}), |
| '{x_std_str}'::double precision[])) |
| AS row_vec |
| FROM {source_table_grouped} |
| """.format(schema_madlib=schema_madlib, |
| pc_table_mean=pc_table_mean, |
| source_table_grouped=source_table_grouped, |
| scaled_source_table=scaled_source_table, |
| grouping_where_clause=grouping_where_clause, |
| x_std_str=x_std_str)) |
| |
| plpy.execute( |
| """ |
| SELECT {schema_madlib}.matrix_mult('{scaled_source_table}', |
| 'trans=false,row=row_id, col=col_id, val=row_vec', |
| '{pc_table_grouped}', |
| 'trans=TRUE, row=row_id, col=col_id, val=principal_components', |
| '{out_table_grouped}', |
| 'row=row_id, col=col_id,val=row_vec'); |
| """.format(schema_madlib=schema_madlib, |
| scaled_source_table=scaled_source_table, |
| pc_table_grouped=pc_table_grouped, |
| out_table_grouped=out_table_grouped)) |
| |
| # Step 3: Compute the Residual table (if required) |
| # Residual table: res = mat - proj |
| create_residual_table = False |
| if residual_table or result_summary_table: |
| residual_table_grouped = "pg_temp." + unique_string() + "_temp_residual" |
| create_temp_residual_table = False |
| if not residual_table: |
| create_temp_residual_table = True |
| else: |
| create_residual_table = True |
| approx_table = "pg_temp." + unique_string() + "_approx" |
| # Build an approximate reconstruction of the data |
| plpy.execute( |
| """ |
| SELECT {schema_madlib}.matrix_mult('{out_table_grouped}', |
| 'row=row_id, col=col_id, val=row_vec', |
| '{pc_table_grouped}', |
| 'row=row_id, col=col_id, val=principal_components', |
| '{approx_table}', |
| 'row=row_id, col=col_id, val=row_vec'); |
| """.format(schema_madlib=schema_madlib, |
| out_table_grouped=out_table_grouped, |
| pc_table_grouped=pc_table_grouped, |
| approx_table=approx_table)) |
| |
| # Compute the difference between the reconstruction and real data |
| # Note that both the approximation and source data are recentered here |
| plpy.execute( |
| """ |
| SELECT {schema_madlib}.matrix_scale_and_add( |
| '{scaled_source_table}', |
| 'row=row_id, col=col_id, val=row_vec', |
| '{approx_table}', |
| 'row=row_id, col=col_id, val=row_vec', |
| -1, |
| '{residual_table_grouped}', |
| 'row=row_id, col=col_id, val=row_vec'); |
| """.format(schema_madlib=schema_madlib, |
| scaled_source_table=scaled_source_table, |
| approx_table=approx_table, |
| residual_table_grouped=residual_table_grouped)) |
| |
| # Step 4: Compute the results summary table (if required) |
| # If the residual table is not asked by the user, but he does ask for |
| # result summary table, then we need to compute the residuals |
| if result_summary_table: |
| source_table_norm = plpy.execute( |
| """ |
| SELECT {schema_madlib}.matrix_norm('{source_table_grouped}', |
| 'row=row_id, col=col_id, val=row_vec') as r |
| """.format(schema_madlib=schema_madlib, |
| source_table_grouped=source_table_grouped, |
| row_id=row_id))[0]['r'] |
| |
| # Compute the norm of the residual table |
| residual_norm = plpy.execute( |
| """ |
| SELECT {schema_madlib}.matrix_norm('{residual_table_grouped}', |
| 'row=row_id, col=col_id, val=row_vec') as r |
| """.format(schema_madlib=schema_madlib, |
| residual_table_grouped=residual_table_grouped, |
| row_id=row_id))[0]['r'] |
| # Compute the relative error of the norm |
| # Prevent division by zero |
| if(source_table_norm > ZERO_THRESHOLD): |
| relative_residual_norm = residual_norm / source_table_norm |
| else: |
| relative_residual_norm = 0 |
| # Compute the time in milli-seconds |
| t1 = time.time() |
| dt = (t1 - t0) * 1000. |
| |
| plpy.execute( |
| """ |
| INSERT INTO {result_summary_table} VALUES |
| ({dt}, |
| {residual_norm}::double precision, |
| {relative_residual_norm}::double precision |
| {grouping_cols_values} |
| ); |
| """.format(dt=dt, residual_norm=residual_norm, |
| result_summary_table=result_summary_table, |
| relative_residual_norm=relative_residual_norm, |
| grouping_cols_values=grouping_cols_values)) |
| |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {approx_table}; |
| """.format(approx_table=approx_table)) |
| if create_temp_residual_table: |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {0}; |
| """.format(residual_table_grouped)) |
| |
| if is_sparse: |
| ## We don't have to join based on row_id for sparse project. |
| if create_residual_table: |
| plpy.execute(""" |
| INSERT INTO {residual_table} |
| SELECT * {select_grouping_cols} |
| FROM {residual_table_grouped} |
| """.format(residual_table=residual_table, |
| select_grouping_cols=select_grouping_cols, |
| residual_table_grouped=residual_table_grouped)) |
| plpy.execute(""" |
| INSERT INTO {out_table} |
| SELECT * {select_grouping_cols} |
| FROM {out_table_grouped} |
| """.format(out_table=out_table, |
| select_grouping_cols=select_grouping_cols, |
| out_table_grouped=out_table_grouped)) |
| else: |
| output_table_cols = get_cols(out_table_grouped) |
| output_table_cols.remove('row_id') |
| output_table_select_clause = """{row_id_map_table}.{temp_row_id}, |
| {out_table_cols} |
| {select_grouping_cols} |
| """.format(row_id_map_table=row_id_map_table, |
| temp_row_id=temp_row_id, |
| out_table_cols=', '.join(output_table_cols), |
| select_grouping_cols=select_grouping_cols) |
| if create_residual_table: |
| plpy.execute(""" |
| INSERT INTO {residual_table} |
| SELECT {select_clause} |
| FROM {residual_table_grouped} |
| INNER JOIN {row_id_map_table} |
| ON {row_id_map_table}.row_id={residual_table_grouped}.row_id |
| """.format(residual_table=residual_table, |
| select_clause=output_table_select_clause, |
| residual_table_grouped=residual_table_grouped, |
| row_id_map_table=row_id_map_table)) |
| plpy.execute(""" |
| INSERT INTO {out_table} |
| SELECT {select_clause} |
| FROM {out_table_grouped} |
| INNER JOIN {row_id_map_table} |
| ON {row_id_map_table}.row_id={out_table_grouped}.row_id |
| """.format(out_table=out_table, |
| select_clause=output_table_select_clause, |
| out_table_grouped=out_table_grouped, |
| row_id_map_table=row_id_map_table)) |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {0}; |
| """.format(row_id_map_table)) |
| if residual_table or result_summary_table: |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {0} |
| """.format(residual_table_grouped)) |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {0}; |
| DROP TABLE IF EXISTS {1}; |
| DROP TABLE IF EXISTS {2}; |
| """.format(scaled_source_table, |
| source_table_grouped, out_table_grouped)) |
| if grouping_cols: |
| plpy.execute(""" |
| DROP TABLE IF EXISTS {0}; |
| """.format(pc_table_grouped)) |
| |