blob: 8f0eec40ca56c3386abdce73d8251011049b20e7 [file] [log] [blame]
"""
@file pca.py_in
@namespace pca
"""
from convex.utils_regularization import __utils_ind_var_scales
from linalg.matrix_ops import get_dims
from linalg.matrix_ops import create_temp_sparse_matrix_table_with_dims
from linalg.matrix_ops import cast_dense_input_table_to_correct_columns
from linalg.matrix_ops import validate_dense
from linalg.matrix_ops import validate_sparse
from utilities.utilities import _array_to_string
from utilities.utilities import add_postfix
from utilities.utilities import __mad_version
from utilities.utilities import unique_string
from utilities.utilities import _assert
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
import time
import plpy
version_wrapper = __mad_version()
string_to_array = version_wrapper.select_vecfunc()
array_to_string = version_wrapper.select_vec_return()
# Validate arguments: Same as pca
# ------------------------------------------------------------------------
def _validate_args(schema_madlib,
source_table,
pc_table,
k,
row_id,
col_id=None,
val_id=None,
row_dim=None,
col_dim=None,
grouping_cols=None,
lanczos_iter=0,
use_correlation=False,
result_summary_table=None):
"""
Validates all arguments passed to the PCA function
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param source_table Name of the source table
@param output_table Name of the output table
@param k Number of singular vectors to return
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val_id Name of the val_id column
@param grouping_cols The columns that the data should be grouped by
@param lanczos_iter The number of lanczos iterations to use in the SVD calculation
@param use_correlation If the correlation matrix should be used instead of the covariance matrix
@param result_summary_table Name of summary table
Returns:
None
Throws:
plpy.error if any argument is invalid
"""
_assert(source_table is not None and table_exists(source_table),
"PCA error: Source data table {0} does not exist!".
format(str(source_table)))
if k <= 0:
plpy.error("PCA error: k must be a positive integer!")
# confirm output tables are valid
if pc_table:
_assert(not table_exists(pc_table, only_first_schema=True) and
not table_exists(pc_table + '_mean', only_first_schema=True),
"PCA error: Output table {pc_table}/{pc_table}_mean "
"already exist!".format(pc_table=pc_table))
else:
plpy.error("PCA error: Invalid output table prefix!")
_assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, "NULL" if row_id is None else row_id))
if(grouping_cols):
plpy.error("PCA error: Grouping columns are not currently supported!\
This value must be set to NULL")
if (lanczos_iter < 0):
plpy.error("PCA error: lanczos_iter can't be negative! (Use zero for \
default value) The provided value is {0}".format(str(lanczos_iter)))
# If using sparse matrices, check that the parameters are reasonable
if col_id or val_id:
if not col_id:
plpy.error("PCA error: Column ID should be provided if \
value ID is input!")
if not val_id:
plpy.error("PCA error: Value ID should be provided if \
column ID is input!")
_assert(columns_exist_in_table(source_table, [col_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, col_id))
_assert(columns_exist_in_table(source_table, [val_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, val_id))
if not col_dim:
plpy.error("PCA error: Column dimensions should be provided if \
using sparse matrices!")
if not row_dim:
plpy.error("PCA error: Row dimensions should be provided if \
using sparse matrices!")
if row_dim <= 0:
plpy.error("PCA error: The row dimension must be larger than 0!")
if col_dim <= 0:
plpy.error("PCA error: The column dimension must be larger than 0!")
validate_sparse(source_table,
{'row': row_id, 'col': col_id, 'val': val_id},
checkCol=False)
if use_correlation:
plpy.error("PCA error: Using the correlation matrix is not enabled! \
This value must be set to FALSE")
if result_summary_table:
if not result_summary_table.strip():
plpy.error("PCA error: Invalid result summary table name!")
_assert(not table_exists(result_summary_table, only_first_schema=True),
"PCA error: Result summary table {0} \
already exists!".format(result_summary_table))
# ========================================================================
def _recenter_data(schema_madlib, source_table, output_table, row_id,
col_name, dimension):
"""
Rescales the data table by column means.
The output is stored in output_table. The input table should have a
array column that contains the data
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param source_table Name of the source table
@param output_table Name of the output table
@param row_id Name of the row_id column
@param col_name Name of the array column from input table
@param dimension
Returns:
Column mean
"""
# Step 1: Compute column mean values
x_scales = __utils_ind_var_scales(tbl_data=source_table,
col_ind_var=col_name,
dimension=dimension,
schema_madlib=schema_madlib)
x_mean_str = _array_to_string(x_scales["mean"])
x_std_str = _array_to_string([1] * dimension)
# Step 2: Rescale the matrices
plpy.execute(
"""
CREATE TABLE {output_table} AS
SELECT
{row_id} as row_id,
({schema_madlib}.utils_normalize_data(
{col_name},
'{x_mean_str}'::double precision[],
'{x_std_str}'::double precision[]))
AS row_vec
FROM {source_table}
""".format(schema_madlib=schema_madlib,
col_name=col_name,
row_id=row_id,
source_table=source_table,
output_table=output_table,
x_mean_str=x_mean_str,
x_std_str=x_std_str))
return x_mean_str
# ------------------------------------------------------------------------
def pca_sparse(schema_madlib,
source_table,
pc_table,
row_id,
col_id,
val_id,
row_dim,
col_dim,
k,
grouping_cols,
lanczos_iter,
use_correlation,
result_summary_table,
**kwargs):
"""
Compute the PCA of a sparse matrix in source_table.
This function is the specific call for dense matrices and creates three
tables corresponding to the three decomposition matrices.
Args:
@param schema_madlib
@param source_table
@param pc_table
@param row_id
@param col_id
@param val_id
@param row_dim
@param col_dim
@param k
@param grouping_cols
@param lanczos_iter
@param use_correlation
@param result_summary_table
Returns:
None
"""
startTime = time.time()
# Reset the message level to avoid random messages
old_msg_level = plpy.execute("""
SELECT setting
FROM pg_settings
WHERE name='client_min_messages'
""")[0]['setting']
plpy.execute('SET client_min_messages TO warning')
# Step 1: Validate the input arguments
_validate_args(schema_madlib, source_table, pc_table, k, row_id, col_id,
val_id, row_dim, col_dim, grouping_cols, lanczos_iter,
use_correlation, result_summary_table)
# Step 2: Densify the matrix
# We densify the matrix because the recentering process will generate a
# dense matrix, so we just wrap around regular PCA.
# First we must copy the sparse matrix and add in the dimension information
sparse_temp = "pg_temp." + unique_string() + "_sparse"
# Add in the dimension information need by the densifying process
create_temp_sparse_matrix_table_with_dims(source_table, sparse_temp,
row_id, col_id, val_id,
row_dim, col_dim)
x_dense = "pg_temp." + unique_string() + "_dense"
plpy.execute("""
SELECT {schema_madlib}.matrix_densify(
'{sparse_temp}',
'row={row_id}, col={col_id}, val={val_id}',
'{x_dense}', 'row=row_id, val=row_vec')
""".format(**locals()))
# Step 3: Pass the densified matrix to regular PCA
pca(schema_madlib, x_dense, pc_table, 'row_id',
k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table)
# Step 4: Clean up
plpy.execute("""
DROP TABLE IF EXISTS {x_dense};
DROP TABLE IF EXISTS {sparse_temp};
""".format(x_dense=x_dense, sparse_temp=sparse_temp))
if result_summary_table:
stopTime = time.time()
dt = (stopTime - startTime) * 1000.
summary_table_tmp_name = unique_string()
plpy.execute(
"""
ALTER TABLE {result_summary_table}
RENAME TO {tmp_name};
""".format(result_summary_table=result_summary_table,
tmp_name=summary_table_tmp_name))
plpy.execute(
"""
CREATE TABLE {result_summary_table} AS
SELECT
rows_used,
{dt} AS "exec_time (ms)",
iter,
recon_error,
relative_recon_error,
use_correlation
FROM {tmp_name};
""".format(result_summary_table=result_summary_table,
dt=str(dt), tmp_name=summary_table_tmp_name))
plpy.execute("DROP TABLE {tmp_name};".format(
tmp_name=summary_table_tmp_name))
plpy.execute("SET client_min_messages TO %s" % old_msg_level)
# ------------------------------------------------------------------------
def pca(schema_madlib, source_table, pc_table, row_id,
k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table,
**kwargs):
"""
Compute the PCA of the matrix in source_table.
This function is the specific call for dense matrices and creates three
tables corresponding to the three decomposition matrices.
Args:
@param schema_madlib
@param source_table
@param pc_table
@param row_id
@param k
@param grouping_cols
@param lanczos_iter
@param use_correlation
@param result_summary_table
Returns:
None
"""
startTime = time.time()
# Reset the message level to avoid random messages
old_msg_level = plpy.execute("""
SELECT setting
FROM pg_settings
WHERE name='client_min_messages'
""")[0]['setting']
plpy.execute('SET client_min_messages TO warning')
# Step 1: Validate the input arguments
_validate_args(schema_madlib, source_table, pc_table, k,
row_id, None, None, None, None,
grouping_cols, lanczos_iter, use_correlation,
result_summary_table)
# Make sure that the table has row_id and row_vec
source_table_copy = "pg_temp." + unique_string() + "_reformated_names"
created_new_table = cast_dense_input_table_to_correct_columns(
schema_madlib, source_table, source_table_copy, row_id)
if(created_new_table):
source_table = source_table_copy
[row_dim, col_dim] = get_dims(source_table,
{'row': 'row_id', 'col': 'col_id',
'val': 'row_vec'})
validate_dense(source_table,
{'row': 'row_id', 'val': 'row_vec'},
checkCol=False, row_dim=row_dim)
# If using the default number of lanczos iterations, set to the default
if lanczos_iter == 0:
lanczos_iter = min(k + 40, min(col_dim, row_dim))
# Note: we currently don't support grouping columns or correlation matrices
if grouping_cols is None and not use_correlation:
# Step 2: Normalize the data (Column means)
dimension = col_dim
scaled_source_table = "pg_temp." + unique_string() + "_scaled_table"
column_mean_str = _recenter_data(schema_madlib,
source_table,
scaled_source_table,
'row_id',
'row_vec',
dimension)
# Step 3: Create temporary output & result summary table
svd_output_temp_table = "pg_temp." + unique_string() + "_svd_output_table"
if result_summary_table is None:
result_summary_table_string = ''
else:
result_summary_table_string = ", '{0}'".format(result_summary_table)
# Step 4: Perform SVD
plpy.execute(
"""
SELECT {schema_madlib}.svd('{scaled_source_table}',
'{svd_output_temp_table}',
'row_id',
{k},
{lanczos_iter}
{result_summary_table_string})
""".format(schema_madlib=schema_madlib,
scaled_source_table=scaled_source_table,
svd_output_temp_table=svd_output_temp_table,
k=k,
lanczos_iter=lanczos_iter,
result_summary_table_string=result_summary_table_string))
# Step 5: Transpose SVD output matrix
svd_v_transpose = "pg_temp." + unique_string() + "transpose"
svd_output_temp_table_s = add_postfix(svd_output_temp_table, "_s")
svd_output_temp_table_v = add_postfix(svd_output_temp_table, "_v")
svd_output_temp_table_u = add_postfix(svd_output_temp_table, "_u")
plpy.execute(
"""
SELECT {schema_madlib}.matrix_trans(
'{svd_output_temp_table_v}',
'row=row_id, col=col_id, val=row_vec',
'{svd_v_transpose}', 'row=row_id, col=col_id, val=row_vec');
""".format(svd_output_temp_table_v=svd_output_temp_table_v,
svd_v_transpose=svd_v_transpose,
schema_madlib=schema_madlib)
)
# Step 6: Insert the output of SVD into the PCA table
plpy.execute(
"""
CREATE TABLE {pc_table} as
SELECT {svd_v_transpose}.row_id,
row_vec AS principal_components,
value / sqrt({row_dim} - 1) AS eigen_values
FROM {svd_v_transpose},
{svd_output_temp_table_s}
WHERE ({svd_v_transpose}.row_id = {svd_output_temp_table_s}.row_id)
AND ({svd_v_transpose}.row_id <= {k})
AND value is not NULL
""".format(svd_output_temp_table_s=svd_output_temp_table_s,
k=k,
svd_v_transpose=svd_v_transpose,
pc_table=pc_table,
row_dim=row_dim))
# Output the column mean
pc_table_mean = add_postfix(pc_table, "_mean")
plpy.execute(
"""
DROP TABLE IF EXISTS {pc_table_mean};
CREATE TABLE {pc_table_mean} AS
SELECT '{column_mean_str}'::FLOAT8[] AS column_mean
""".format(pc_table_mean=pc_table_mean,
column_mean_str=column_mean_str))
# Step 7: Append to the SVD summary table to get the PCA summary table
if result_summary_table:
stopTime = time.time()
dt = (stopTime - startTime) * 1000.
summary_table_tmp_name = unique_string()
plpy.execute(
"""
ALTER TABLE {result_summary_table}
RENAME TO {tmp_name};
""".format(result_summary_table=result_summary_table,
tmp_name=summary_table_tmp_name))
plpy.execute(
"""
CREATE TABLE {result_summary_table} AS
SELECT
rows_used,
{dt} AS "exec_time (ms)",
iter,
recon_error,
relative_recon_error,
{use_correlation} AS use_correlation
FROM {tmp_name};
""".format(result_summary_table=result_summary_table,
dt=str(dt), use_correlation=bool(use_correlation),
tmp_name=summary_table_tmp_name))
plpy.execute("DROP TABLE {tmp_name};".format(
tmp_name=summary_table_tmp_name))
# Step 8: Output handling & cleanup
plpy.execute(
"""
DROP TABLE IF EXISTS {svd_v_transpose};
DROP TABLE IF EXISTS {source_table_copy};
DROP TABLE IF EXISTS {svd_output_temp_table};
DROP TABLE IF EXISTS {svd_output_temp_table_s};
DROP TABLE IF EXISTS {svd_output_temp_table_u};
DROP TABLE IF EXISTS {svd_output_temp_table_v};
DROP TABLE IF EXISTS {scaled_source_table};
""".format(svd_output_temp_table=svd_output_temp_table,
svd_output_temp_table_s=svd_output_temp_table_s,
svd_output_temp_table_u=svd_output_temp_table_u,
svd_output_temp_table_v=svd_output_temp_table_v,
scaled_source_table=scaled_source_table,
svd_v_transpose=svd_v_transpose,
source_table_copy=source_table_copy))
plpy.execute("SET client_min_messages TO %s" % old_msg_level)
def pca_sparse_help_message(schema_madlib, message=None, **kwargs):
"""
Given a help string, provide usage information
Args:
@param schema_madlib Name of the MADlib schema
@param message Helper message to print
Returns:
None
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.pca_sparse_train(
source_table -- TEXT, Name of data table
pc_table -- TEXT, Name of the table containing the principle components
row_id -- TEXT, Column name for the row coordinates.
col_id -- TEXT, Column name for the column coordinates.
val_id -- TEXT, Column name for the sparse values.
row_dim, -- INTEGER, The number of rows in the sparse matrix
col_dim, -- INTEGER, The number of columns in the sparse matrix
k -- INTEGER, Number of principal components to compute
[
grouping_cols -- TEXT, Comma-separated list of grouping columns
(Default: NULL)
lanczos_iter -- INTEGER, The number of Lanczos iterations to use in the SVD calculation
(Default: minimum of of the smallest input
matrix dimension and k+40)
use_correlation -- BOOLEAN, If True correlation matrix is used for principal components
(Default: False)
rslt_summary_table -- TEXT, Table name to store summary of results
(Default: NULL)
]
);
-------------------------------------------------------------------------
OUTPUT TABLES
-------------------------------------------------------------------------
The output table ("pc_table" above) has the following columns:
row_id -- INTEGER, The ranking of the eigenvalues
prin_comp -- FLOAT[], The principal components
eigen_values -- FLOAT[] The eigenvalues associated with each principal component
A secondary output table named "pc_table"_mean is also generated.
This table has only the single column:
column_mean -- FLOAT[], The column means of the input data
-------------------------------------------------------------------------
RESULT SUMMARY TABLE
-------------------------------------------------------------------------
The result summary table ("rslt_summary_table" above) has the following columns
rows_used -- INTEGER, Number of rows used in the PCA calculation
exec_time -- FLOAT, Number of milliseconds the PCA calculation took
use_correlation -- BOOLEAN, Value of parameter use_correlation
iter -- INTEGER, Number of iterations the SVD took to converge
recon_error -- FLOAT, Absolute error in the approximation
relative_recon_error -- FLOAT Relative error in the approximation
""".format(schema_madlib=schema_madlib)
else:
return """
Principal component analysis (PCA) is a mathematical procedure that uses an
orthogonal transformation to convert a set of observations of possibly
correlated variables into a set of values of linearly uncorrelated variables
called principal components. This transformation is defined in such a way that
the first principal component has the largest possible variance (i.e.,
accounts for as much of the variability in the data as possible), and each
succeeding component in turn has the highest variance possible under the
constraint that it be orthogonal to (i.e., uncorrelated with) the preceding
components.
For an overview on usage, run: SELECT {schema_madlib}.pca_sparse_train('usage');
""".format(schema_madlib=schema_madlib)
def pca_help_message(schema_madlib, message=None, **kwargs):
"""
Given a help string, provide usage information
Args:
@param schema_madlib Name of the MADlib schema
@param message Helper message to print
Returns:
None
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.pca_train(
source_table -- TEXT, Name of data table
pc_table -- TEXT, Name of the table containing the principle components
row_id -- TEXT, Column name for the row coordinates.
k -- INTEGER, Number of principal components to compute
[
grouping_cols -- TEXT, Comma-separated list of grouping columns
(Default: NULL)
lanczos_iter -- INTEGER, The number of Lanczos iterations to use in the SVD calculation
(Default: minimum of of the smallest input
matrix dimension and k+40)
use_correlation -- BOOLEAN, If True correlation matrix is used for principal components
(Default: False)
rslt_summary_table -- TEXT, Table name to store summary of results
(Default: NULL)
]
);
-------------------------------------------------------------------------
OUTPUT TABLES
-------------------------------------------------------------------------
The output table ("pc_table" above) has the following columns:
row_id -- INTEGER, The ranking of the eigenvalues
prin_comp -- FLOAT[], The principal components
eigen_values -- FLOAT[] The eigenvalues associated with each principal component
A secondary output table named "pc_table"_mean is also generated.
This table has only the single column:
column_mean -- FLOAT[], The column means of the input data
-------------------------------------------------------------------------
RESULT SUMMARY TABLE
-------------------------------------------------------------------------
The result summary table ("rslt_summary_table" above) has the following columns
rows_used -- INTEGER, Number of rows used in the PCA calculation
exec_time -- FLOAT, Number of milliseconds the PCA calculation took
use_correlation -- BOOLEAN, Value of parameter use_correlation
iter -- INTEGER, Number of iterations the SVD took to converge
recon_error -- FLOAT, Absolute error in the approximation
relative_recon_error -- FLOAT Relative error in the approximation
""".format(schema_madlib=schema_madlib)
else:
return """
Principal component analysis (PCA) is a mathematical procedure that uses an
orthogonal transformation to convert a set of observations of possibly
correlated variables into a set of values of linearly uncorrelated variables
called principal components. This transformation is defined in such a way that
the first principal component has the largest possible variance (i.e.,
accounts for as much of the variability in the data as possible), and each
succeeding component in turn has the highest variance possible under the
constraint that it be orthogonal to (i.e., uncorrelated with) the preceding
components.
For an overview on usage, run: SELECT {schema_madlib}.pca_train('usage');
""".format(schema_madlib=schema_madlib)