blob: 20e7ae52f16ce99925d2aa37b6fad6548e6a784b [file] [log] [blame]
"""
@file pca_project.py_in
@namespace pca
"""
import plpy
import time
from linalg.matrix_ops import cast_dense_input_table_to_correct_columns
from linalg.matrix_ops import create_temp_sparse_matrix_table_with_dims
from linalg.matrix_ops import get_dims
from linalg.matrix_ops import validate_dense
from linalg.matrix_ops import validate_sparse
from utilities.utilities import __mad_version
from utilities.utilities import unique_string
from utilities.utilities import _assert
from utilities.utilities import _array_to_string
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
from utilities.utilities import add_postfix
from utilities.validate_args import get_cols, get_cols_and_types
from utilities.control import MinWarning
version_wrapper = __mad_version()
string_to_array = version_wrapper.select_vecfunc()
array_to_string = version_wrapper.select_vec_return()
ZERO_THRESHOLD = 1e-6
# Dense PCA project help function
def pca_project_help(schema_madlib, usage_string=None, **kwargs):
"""
Given a usage string, give out function usage information.
"""
if usage_string is not None and \
usage_string.lower() in ("usage", "help", "?"):
return """
----------------------------------------------------------------
Usage
----------------------------------------------------------------
SELECT {schema_madlib}.pca_project (
'tbl_source', -- Data table
'pc_table', -- Table with principal componenents
(obtained as output from pca_train)
'tbl_result', -- Result table
'row_id', -- Name of the column containing the row_id
-- Optional Parameters
----------------------------------------------------------------
'tbl_residual', -- Residual table (Default: NULL)
'tbl_result_summary', -- Result summary table (Default : NULL)
);
Note that if the principal components in pc_table were learnt using
grouping_cols in {schema_madlib}.pca_train(), the tbl_source used
here must also have those grouping columns. This will fail otherwise.
Output Tables
--------------------------------------------------------------------
The output is divided into three tables (two of which are optional)
--------------------------------------------------------------------
The output table ('tbl_result' above) encodes a dense matrix
with the projection onto the principal components. The matrix contains
the following columns:
'row_id' INTEGER, -- Row id of the output matrix
'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix
grouping_col -- The grouping columns present in the 'pc_table', if any
--------------------------------------------------------------------
The residual table ('tbl_residual' above) encodes a dense residual
matrix which has the following columns
'row_id' INTEGER, -- Row id of the output matrix
'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix
grouping_col -- The grouping columns present in the 'pc_table', if any
--------------------------------------------------------------------
The result summary table ('tbl_result_summary' above) has the following columns
'exec_time' INTEGER, -- Wall clock time (ms) of the function.
'residual_norm' DOUBLE PRECISION, -- Absolute error of the residuals
'relative_residual_norm' DOUBLE PRECISION, -- Relative error of the residuals
grouping_col -- The grouping columns present in the 'pc_table', if any
----------------------------------------------------------------
""".format(schema_madlib=schema_madlib)
else:
return """
----------------------------------------------------------------
Summary: PCA Projection
----------------------------------------------------------------
PCA Projection: Projects a dataset to an already trained
space of principal components.
--
For function usage information, run
SELECT {schema_madlib}.pca_project('usage');
--
""".format(schema_madlib=schema_madlib)
# Sparse PCA help function
# ------------------------------------------------------------------------
def pca_sparse_project_help(schema_madlib, usage_string=None, **kwargs):
"""
Given a usage string, give out function usage information.
"""
if usage_string is not None and \
usage_string.lower() in ("usage", "help", "?"):
return """
----------------------------------------------------------------
Usage
----------------------------------------------------------------
SELECT {schema_madlib}.pca_sparse_project (
'tbl_source', -- Data table
'pc_table', -- Table with principal componenents
(obtained as output from pca_train)
'tbl_result', -- Result table
'row_id', -- Name of the column containing the row_id
'col_id', -- Name of the column containing the col_id
'val_id', -- Name of the column containing the val_id
'row_dim' -- Row dimension of the sparse matrix
'col_dim' -- Column dimension of the sparse matrix
-- Optional Parameters
----------------------------------------------------------------
'tbl_residual', -- Residual table (Default: NULL)
'tbl_result_summary', -- Result summary table (Default : NULL)
);
Note that if the principal components in 'pc_table' were learnt using
grouping_cols in {schema_madlib}.pca_train(), the tbl_source used
here must also have those grouping columns. This will fail otherwise.
Output Tables
----------------------------------------------------------------
The output is divided into three tables (two of which are optional)
-----------------------------------------------------------------------------------------
The output table ('tbl_result' above) encodes a dense matrix
with the projection onto the principal components. The matrix contains
the following columns:
'row_id' INTEGER, -- Row id of the output matrix
'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix
grouping_col -- The grouping columns present in the 'pc_table', if any
-----------------------------------------------------------------------------------------
The residual table ('tbl_residual' above) encodes a dense residual
matrix which has the following columns
'row_id' INTEGER, -- Row id of the output matrix
'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix
grouping_col -- The grouping columns present in the 'pc_table', if any
-----------------------------------------------------------------------------------------
The result summary table ('tbl_result_summary' above) has the following columns
'exec_time' INTEGER, -- Wall clock time (ms) of the function.
'residual_norm' DOUBLE PRECISION, -- Absolute error of the residuals
'relative_residual_norm' DOUBLE PRECISION, -- Relative error of the residuals
grouping_col -- The grouping columns present in the 'pc_table', if any
----------------------------------------------------------------
""".format(schema_madlib=schema_madlib)
else:
return """
----------------------------------------------------------------
Summary: PCA Projection
----------------------------------------------------------------
PCA Projection: Projects a dataset to an already trained
space of principal components.
--
For function usage information, run:
SELECT {schema_madlib}.pca_sparse_project('usage');
--
""".format(schema_madlib=schema_madlib)
def _validate_args(schema_madlib,
source_table,
pc_table,
out_table,
row_id,
col_id=None,
val_id=None,
row_dim=None,
col_dim=None,
residual_table=None,
result_summary_table=None):
"""
Validates all arguments passed to the PCA function
Args:
@param schema_madlib Name of MADlib schema
@param source_table Name of the input table (containing data to project)
@param pc_table Name of table with principal components (output by the training function)
@param out_table Name of output table to store projection result
@param row_id Name of the row_id column
@param col_id Name of the col_id column (only for sparse matrices)
@param val_id Name of the val_id column (only for sparse matrices)
@param row_dim Number of rows in input matrix (only for sparse matrices)
@param col_dim Number of columns in input matrix (only for sparse matrices)
@param residual_table Name of the residual table (to store error in projection)
@param result_summary_table Name of result summary table
Returns:
None
Throws:
plpy.error if any argument is invalid
"""
_assert(source_table is not None and table_exists(source_table),
"PCA error: Source data table does not exist!")
_assert(pc_table is not None and table_exists(pc_table),
"PCA error: Principal comp. table does not exist!")
_assert(table_exists(add_postfix(pc_table, "_mean")),
"PCA error: Source data table column means does not exist!")
# Make sure that the output table does not exist
# Also check that the output table is not null
_assert(out_table and out_table.strip(),
"PCA error: Invalid output table name.")
_assert(not table_exists(out_table, only_first_schema=True),
"PCA error: Output table {0} already exists!".format(str(out_table)))
# Check that the result summary table is not empty
if result_summary_table is not None:
_assert(result_summary_table.strip(),
"PCA error: Invalid result summary table name!")
_assert(not table_exists(result_summary_table, only_first_schema=True),
"PCA error: Result summary table {0} already exists!".
format(result_summary_table))
# Check that the result summary table is not empty
if residual_table is not None:
_assert(residual_table.strip(),
"PCA error: Invalid residual table name!")
_assert(not table_exists(residual_table, only_first_schema=True),
"PCA error: Residual table {0} already exists!".
format(residual_table))
# Check that the row_id exists
_assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, "NULL" if row_id is None else row_id))
# For sparse inputs: Check that the row_id exists
if col_id or val_id:
_assert(col_id,
"PCA error: Column ID should be provided if value ID is input!")
_assert(val_id,
"PCA error: Value ID should be provided if column ID is input!")
_assert(columns_exist_in_table(source_table, [col_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, col_id))
_assert(columns_exist_in_table(source_table, [val_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, val_id))
_assert(row_dim > 0 and col_dim > 0,
"PCA error: row_dim/col_dim should be positive integer")
# ------------------------------------------------------------------------
def pca_sparse_project(schema_madlib,
source_table,
pc_table,
out_table,
row_id,
col_id,
val_id,
row_dim,
col_dim,
residual_table,
result_summary_table,
**kwargs):
"""
PCA projection of the matrix in source_table.
This function is the specific call for pca projection. It projects
the input matrix into the principal components.
Args:
@param schema_madlib Name of MADlib schema
@param source_table Name of the input table (containing data to project)
@param pc_table Name of table with principal components (output by the training function)
@param out_table Name of output table to store projection result
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val_id Name of the val_id column
@param row_dim Number of rows in input matrix
@param col_dim Number of columns in input matrix
@param residual_table Name of the residual table (to store error in projection)
@param result_summary_table Name of result summary table
Returns:
None
Throws:
plpy.error if any argument is invalid
"""
pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
row_id, residual_table, result_summary_table,
True, col_id, val_id, row_dim, col_dim)
# ------------------------------------------------------------------------
def pca_project(schema_madlib,
source_table,
pc_table,
out_table,
row_id,
residual_table,
result_summary_table,
**kwargs):
"""
PCA projection of the matrix in source_table.
This function is the specific call for pca projection. It projects
the input matrix into the principal components.
Args:
@param schema_madlib Name of MADlib schema
@param source_table Name of the input table (containing data to project)
@param pc_table Name of table with principal components (output by the training function)
@param out_table Name of output table to store projection result
@param row_id Name of the row_id column
@param residual_table Name of the residual table (to store error in projection)
@param result_summary_table Name of result summary table
Returns:
None
Throws:
plpy.error if any argument is invalid
"""
pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
row_id, residual_table, result_summary_table)
def pca_project_wrap(schema_madlib, source_table, pc_table, out_table,
row_id, residual_table,
result_summary_table, is_sparse=False,
col_id=None, val_id=None, row_dim=None,
col_dim=None, **kwargs):
"""
This wrapper was added to support grouping columns. This
function does the necessary pre-processing for handling
grouping_cols, if set. It then constructs a single query that
includes a separate "madlib.pca_project_wrap(...)" for each group.
"""
# Reset the message level to avoid random messages
old_msg_level = plpy.execute("""
SELECT setting
FROM pg_settings
WHERE name='client_min_messages'
""")[0]['setting']
plpy.execute('SET client_min_messages TO warning')
if is_sparse:
_validate_args(schema_madlib, source_table, pc_table, out_table,
row_id, col_id, val_id, row_dim, col_dim, residual_table,
result_summary_table)
else:
_validate_args(schema_madlib, source_table, pc_table, out_table,
row_id, None, None, None, None,
residual_table, result_summary_table)
# If we add new columns to the pca_train output table in the future, they should
# be included in this list:
pc_table_model_cols = ['row_id', 'principal_components', 'std_dev', 'proportion']
grouping_cols_list = [col for col in get_cols(pc_table) if col not in pc_table_model_cols]
grouping_cols = ''
if grouping_cols_list:
grouping_cols = ', '.join(grouping_cols_list)
other_columns_in_table = [col for col in get_cols(source_table) if col not in grouping_cols_list]
grouping_cols_clause = ''
if(grouping_cols):
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the source_table, and not expressions!
_assert(columns_exist_in_table(source_table, grouping_cols_list, schema_madlib),
"""PCA error: One or more grouping columns in {0} do not exist in {1}, but
the model in {2} was learnt with grouping!""".format(grouping_cols,
source_table, pc_table))
distinct_grouping_values = plpy.execute("""
SELECT DISTINCT {grouping_cols} FROM {source_table}
""".format(grouping_cols=grouping_cols, source_table=source_table))
cols_names_types = get_cols_and_types(source_table)
grouping_cols_clause = ', ' + ', '.join([c_name+" "+c_type
for (c_name, c_type) in cols_names_types if c_name in grouping_cols_list])
# Create all output tables
plpy.execute("""
CREATE TABLE {0} (
row_id INTEGER,
row_vec double precision[]
{1}
) """.format(out_table, grouping_cols_clause))
if result_summary_table:
plpy.execute(
"""
CREATE TABLE {0} (
exec_time FLOAT8,
residual_norm FLOAT8,
relative_residual_norm FLOAT8
{1}
) """.format(result_summary_table, grouping_cols_clause))
else:
result_summary_table = ''
if residual_table:
plpy.execute("""
CREATE TABLE {0} (
row_id INTEGER,
row_vec double precision[]
{1}
) """.format(residual_table, grouping_cols_clause))
if not residual_table:
residual_table = ''
# declare variables whose values will be different for each group, if
# grouping_cols is specified
grouping_where_clause = ''
sparse_where_condition = ''
select_grouping_cols = ''
grouping_cols_values = ''
result_summary_table_temp = ''
other_columns_in_pc_table = [col for col in get_cols(pc_table) if col not in grouping_cols_list]
temp_pc_table_columns = ', '.join(other_columns_in_pc_table)
original_row_id = row_id
other_columns_in_table.remove(row_id)
temp_source_table_columns = ','.join(other_columns_in_table)
pca_union_call_list = []
grp_id = 0
if not is_sparse:
col_id = 'NULL'
val_id = 'NULL'
row_dim = 0
col_dim = 0
while True:
if grouping_cols:
grp_value_dict = distinct_grouping_values[grp_id]
where_conditions = ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()])
sparse_where_condition = ' AND ' + where_conditions
grouping_where_clause = ' WHERE ' + where_conditions
select_grouping_cols = ', ' + ', '.join([str(value)+" AS "+key for (key, value) in grp_value_dict.items()])
grouping_cols_values = ', ' + ', '.join([str(value) for (key, value) in grp_value_dict.items()])
pca_union_call_list.append("""
{schema_madlib}._pca_project_union('{source_table}', '{pc_table}', '{out_table}',
'{row_id}', '{original_row_id}', '{grouping_cols}',
'{grouping_cols_clause}', '{residual_table}', '{result_summary_table}',
{grp_id}, '{grouping_where_clause}', '{sparse_where_condition}','{select_grouping_cols}',
'{grouping_cols_values}', '{temp_source_table_columns}', '{temp_pc_table_columns}',
{is_sparse}, '{col_id}', '{val_id}', {row_dim}, {col_dim})
""".format(schema_madlib=schema_madlib,
source_table=source_table, pc_table=pc_table,
out_table=out_table, row_id=row_id,
original_row_id=original_row_id,
grouping_cols=grouping_cols,
grouping_cols_clause=grouping_cols_clause,
residual_table=residual_table,
result_summary_table=result_summary_table,
grp_id=grp_id, grouping_where_clause=grouping_where_clause,
sparse_where_condition=sparse_where_condition,
select_grouping_cols=select_grouping_cols,
grouping_cols_values=grouping_cols_values,
temp_source_table_columns=temp_source_table_columns,
temp_pc_table_columns=temp_pc_table_columns, is_sparse=is_sparse,
col_id=col_id, val_id=val_id, row_dim=row_dim, col_dim=col_dim))
grp_id += 1
if not grouping_cols_list or len(distinct_grouping_values) == grp_id:
break
# "SELECT <query_1>, <query_2>, <query_3>, ..." is expected to run each
# <query_i> in parallel.
pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list)
plpy.execute(pca_union_call)
plpy.execute("SET client_min_messages TO %s" % old_msg_level)
def _pca_project_union(schema_madlib, source_table, pc_table, out_table,
row_id, original_row_id, grouping_cols, grouping_cols_clause,
residual_table, result_summary_table, grp_id, grouping_where_clause,
sparse_where_condition, select_grouping_cols, grouping_cols_values,
temp_source_table_columns, temp_pc_table_columns, is_sparse, col_id,
val_id, row_dim, col_dim, **kwargs):
"""
The pca_project is performed over each group, if any.
Args:
@param schema_madlib -- madlib schema name
@param source_table -- Source table name (dense matrix)
@param pc_table -- Output table name for the principal components
@param out_table -- Output table name
@param row_id -- Column name for the ID for each row
@param original_row_id -- copy of the row_id originally passed
@param grouping_cols -- Comma-separated list of grouping columns (Default: NULL)
@param grouping_cols_clause -- Part of the SQL query to be used with grouping_cols
@param residual_table -- Residual table name
@param result_summary_table -- Table name to store summary of results (Default: NULL)
@param grp_id -- a place holder id for each group
@param grouping_where_clause -- WHERE clause using grouping_cols
@param select_grouping_cols -- SELECT clause using grouping_cols
@param grouping_cols_values -- distinct values of the grouping_cols
@param temp_source_table_columns -- SELECT caluse for creating temporary copy of the source_table
@param temp_pc_table_columns -- non grouping_cols of the source_table
@param is_sparse -- specifies if the PCA call is for sparse or dense matrices
@param col_id -- sparse representation based detail
@param val_id -- sparse representation based detail
@param row_dim -- sparse representation based detail
@param col_dim -- sparse representation based detail
Returns:
None
"""
out_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
if grouping_cols:
pc_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
plpy.execute("""
CREATE TABLE {pc_table_grouped} AS
SELECT {temp_pc_table_columns}
FROM {pc_table}
{grouping_where_clause}
""".format(pc_table_grouped=pc_table_grouped,
pc_table=pc_table, grouping_where_clause=grouping_where_clause,
temp_pc_table_columns=temp_pc_table_columns))
else:
pc_table_grouped = pc_table
t0 = time.time() # measure the starting time
# Step 1: Validate the input arguments
if is_sparse:
# Step 1.1: Create a copy of the sparse matrix and add row_dims and col_dims
# Warning: This changes the column names of the table
sparse_table_copy = "pg_temp." + unique_string() + "_sparse_table_copy"
create_temp_sparse_matrix_table_with_dims(source_table, sparse_table_copy,
row_id, col_id, val_id,
row_dim, col_dim, sparse_where_condition)
validate_sparse(sparse_table_copy,
{'row': row_id, 'col': col_id, 'val': val_id},
check_col=False)
# Step 1.2: Densify the input matrix
x_dense = "pg_temp." + unique_string() + "_dense"
plpy.execute("""
SELECT {schema_madlib}.matrix_densify(
'{sparse_table_copy}', 'row={row_id}, col={col_id}, val={val_id}',
'{x_dense}', 'row=row_id, col=col_id,val=row_vec')
""".format(schema_madlib=schema_madlib,
sparse_table_copy=sparse_table_copy, row_id=row_id,
col_id=col_id, val_id=val_id, x_dense=x_dense))
plpy.execute("""
DROP TABLE IF EXISTS {0};
""".format(sparse_table_copy))
source_table_grouped = x_dense
else:
# For Dense matrix format only:
# We can now ignore the original row_id for all computations since we will
# create a new table with a row_id column that has not duplicates and ranges
# from 1 to number of rows in the group/table. This is to mainly support the
# grouping scneario where the row_id values might not range between 1 and
# number of rows in the group, for each group. Doing this also just extends
# this behavior for non-grouping scenarios too. If creating a new temp table
# that corrects the row_id column is not of much importance in non-grouping
# cases, we can avoid creating the temp table and save some computation time.
# But, at the moment, the code creates the temp table even for the non-grouping
# scenario.
# We don't need to do this for sparse representation because of the nature
# of its definition.
# Preserve the mapping between new row_id created and the original row_id. This is
# required only for dense input format.
temp_row_id = "original_row_id" + unique_string()
row_id_map_table = "rowid" + unique_string()
plpy.execute("""
CREATE TABLE {row_id_map_table} AS
SELECT
{source_table}.{original_row_id} AS {temp_row_id},
{select_clause}
FROM {source_table}
{grouping_where_clause}
""".format(row_id_map_table=row_id_map_table,
original_row_id=original_row_id,
temp_row_id=temp_row_id,
source_table=source_table,
select_clause=""" ROW_NUMBER() OVER() AS row_id """,
grouping_where_clause=grouping_where_clause))
# Creation of this temp table is unnecessary if the scenario does not involve
# grouping, and/or, the input table had perfect values for the row_id column.
# This temp table will ensure pca works even when row_id of the source_table
# does not have serially increasing numbers starting from 1;
source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
plpy.execute("""
CREATE TABLE {source_table_grouped} AS
SELECT {row_id_map_table}.row_id, {temp_source_table_columns}
FROM
(
SELECT *
FROM {source_table}
{grouping_where_clause}
) t1
INNER JOIN {row_id_map_table}
ON {row_id_map_table}.{temp_row_id}=t1.{row_id}
""".format(source_table_grouped=source_table_grouped,
temp_row_id=temp_row_id, row_id_map_table=row_id_map_table, row_id=row_id,
source_table=source_table, grouping_where_clause=grouping_where_clause,
temp_source_table_columns=temp_source_table_columns))
row_id = 'row_id'
# Make sure that the table has row_id and row_vec
source_table_copy = "pg_temp." + unique_string()
need_new_column_names = cast_dense_input_table_to_correct_columns(
schema_madlib, source_table_grouped, source_table_copy, row_id)
if(need_new_column_names):
source_table_grouped = source_table_copy
[row_dim, col_dim] = get_dims(source_table_grouped,
{'row': 'row_id', 'col': 'col_id',
'val': 'row_vec'})
validate_dense(source_table_grouped,
{'row': 'row_id', 'col': 'col_id', 'val': 'row_vec'},
check_col=False, row_dim=row_dim)
# Step 2: Compute the PCA Projection matrix
# The R code to perform this step is
# p <- princomp(mat)
# low_rank_representation <- mat %*% p$loadings[,1:k]
# First normalize the data (Column means)
scaled_source_table = "pg_temp." + unique_string() + "_scaled_table"
x_std_str = _array_to_string([1] * col_dim)
pc_table_mean = add_postfix(pc_table, "_mean")
plpy.execute(
"""
CREATE TABLE {scaled_source_table}
m4_ifdef(`__POSTGRESQL__', `', `WITH (appendonly=true)')
AS
SELECT
row_id,
({schema_madlib}.utils_normalize_data(
row_vec,
(select column_mean from {pc_table_mean}
{grouping_where_clause}),
'{x_std_str}'::double precision[]))
AS row_vec
FROM {source_table_grouped}
""".format(schema_madlib=schema_madlib,
pc_table_mean=pc_table_mean,
source_table_grouped=source_table_grouped,
scaled_source_table=scaled_source_table,
grouping_where_clause=grouping_where_clause,
x_std_str=x_std_str))
plpy.execute(
"""
SELECT {schema_madlib}.matrix_mult('{scaled_source_table}',
'trans=false,row=row_id, col=col_id, val=row_vec',
'{pc_table_grouped}',
'trans=TRUE, row=row_id, col=col_id, val=principal_components',
'{out_table_grouped}',
'row=row_id, col=col_id,val=row_vec');
""".format(schema_madlib=schema_madlib,
scaled_source_table=scaled_source_table,
pc_table_grouped=pc_table_grouped,
out_table_grouped=out_table_grouped))
# Step 3: Compute the Residual table (if required)
# Residual table: res = mat - proj
create_residual_table = False
if residual_table or result_summary_table:
residual_table_grouped = "pg_temp." + unique_string() + "_temp_residual"
create_temp_residual_table = False
if not residual_table:
create_temp_residual_table = True
else:
create_residual_table = True
approx_table = "pg_temp." + unique_string() + "_approx"
# Build an approximate reconstruction of the data
plpy.execute(
"""
SELECT {schema_madlib}.matrix_mult('{out_table_grouped}',
'row=row_id, col=col_id, val=row_vec',
'{pc_table_grouped}',
'row=row_id, col=col_id, val=principal_components',
'{approx_table}',
'row=row_id, col=col_id, val=row_vec');
""".format(schema_madlib=schema_madlib,
out_table_grouped=out_table_grouped,
pc_table_grouped=pc_table_grouped,
approx_table=approx_table))
# Compute the difference between the reconstruction and real data
# Note that both the approximation and source data are recentered here
plpy.execute(
"""
SELECT {schema_madlib}.matrix_scale_and_add(
'{scaled_source_table}',
'row=row_id, col=col_id, val=row_vec',
'{approx_table}',
'row=row_id, col=col_id, val=row_vec',
-1,
'{residual_table_grouped}',
'row=row_id, col=col_id, val=row_vec');
""".format(schema_madlib=schema_madlib,
scaled_source_table=scaled_source_table,
approx_table=approx_table,
residual_table_grouped=residual_table_grouped))
# Step 4: Compute the results summary table (if required)
# If the residual table is not asked by the user, but he does ask for
# result summary table, then we need to compute the residuals
if result_summary_table:
source_table_norm = plpy.execute(
"""
SELECT {schema_madlib}.matrix_norm('{source_table_grouped}',
'row=row_id, col=col_id, val=row_vec') as r
""".format(schema_madlib=schema_madlib,
source_table_grouped=source_table_grouped,
row_id=row_id))[0]['r']
# Compute the norm of the residual table
residual_norm = plpy.execute(
"""
SELECT {schema_madlib}.matrix_norm('{residual_table_grouped}',
'row=row_id, col=col_id, val=row_vec') as r
""".format(schema_madlib=schema_madlib,
residual_table_grouped=residual_table_grouped,
row_id=row_id))[0]['r']
# Compute the relative error of the norm
# Prevent division by zero
if(source_table_norm > ZERO_THRESHOLD):
relative_residual_norm = residual_norm / source_table_norm
else:
relative_residual_norm = 0
# Compute the time in milli-seconds
t1 = time.time()
dt = (t1 - t0) * 1000.
plpy.execute(
"""
INSERT INTO {result_summary_table} VALUES
({dt},
{residual_norm}::double precision,
{relative_residual_norm}::double precision
{grouping_cols_values}
);
""".format(dt=dt, residual_norm=residual_norm,
result_summary_table=result_summary_table,
relative_residual_norm=relative_residual_norm,
grouping_cols_values=grouping_cols_values))
plpy.execute("""
DROP TABLE IF EXISTS {approx_table};
""".format(approx_table=approx_table))
if create_temp_residual_table:
plpy.execute("""
DROP TABLE IF EXISTS {0};
""".format(residual_table_grouped))
if is_sparse:
## We don't have to join based on row_id for sparse project.
if create_residual_table:
plpy.execute("""
INSERT INTO {residual_table}
SELECT * {select_grouping_cols}
FROM {residual_table_grouped}
""".format(residual_table=residual_table,
select_grouping_cols=select_grouping_cols,
residual_table_grouped=residual_table_grouped))
plpy.execute("""
INSERT INTO {out_table}
SELECT * {select_grouping_cols}
FROM {out_table_grouped}
""".format(out_table=out_table,
select_grouping_cols=select_grouping_cols,
out_table_grouped=out_table_grouped))
else:
output_table_cols = get_cols(out_table_grouped)
output_table_cols.remove('row_id')
output_table_select_clause = """{row_id_map_table}.{temp_row_id},
{out_table_cols}
{select_grouping_cols}
""".format(row_id_map_table=row_id_map_table,
temp_row_id=temp_row_id,
out_table_cols=', '.join(output_table_cols),
select_grouping_cols=select_grouping_cols)
if create_residual_table:
plpy.execute("""
INSERT INTO {residual_table}
SELECT {select_clause}
FROM {residual_table_grouped}
INNER JOIN {row_id_map_table}
ON {row_id_map_table}.row_id={residual_table_grouped}.row_id
""".format(residual_table=residual_table,
select_clause=output_table_select_clause,
residual_table_grouped=residual_table_grouped,
row_id_map_table=row_id_map_table))
plpy.execute("""
INSERT INTO {out_table}
SELECT {select_clause}
FROM {out_table_grouped}
INNER JOIN {row_id_map_table}
ON {row_id_map_table}.row_id={out_table_grouped}.row_id
""".format(out_table=out_table,
select_clause=output_table_select_clause,
out_table_grouped=out_table_grouped,
row_id_map_table=row_id_map_table))
plpy.execute("""
DROP TABLE IF EXISTS {0};
""".format(row_id_map_table))
if residual_table or result_summary_table:
plpy.execute("""
DROP TABLE IF EXISTS {0}
""".format(residual_table_grouped))
plpy.execute("""
DROP TABLE IF EXISTS {0};
DROP TABLE IF EXISTS {1};
DROP TABLE IF EXISTS {2};
""".format(scaled_source_table,
source_table_grouped, out_table_grouped))
if grouping_cols:
plpy.execute("""
DROP TABLE IF EXISTS {0};
""".format(pc_table_grouped))