blob: cab8e7f0329019c19196c7b147cc1fea9b22e49e [file] [log] [blame]
"""
@file pca.py_in
@namespace pca
"""
from convex.utils_regularization import utils_ind_var_scales
from linalg.matrix_ops import get_dims
from linalg.matrix_ops import create_temp_sparse_matrix_table_with_dims
from linalg.matrix_ops import cast_dense_input_table_to_correct_columns
from linalg.matrix_ops import validate_dense
from linalg.matrix_ops import validate_sparse
from linalg.svd import create_summary_table
from linalg.svd import _svd_lower_wrap
from linalg.svd import _svd_upper_wrap
from utilities.utilities import _array_to_string
from utilities.utilities import add_postfix
from utilities.utilities import __mad_version
from utilities.utilities import unique_string, split_quoted_delimited_str
from utilities.utilities import _assert
from utilities.validate_args import get_cols, get_cols_and_types
from utilities.control import MinWarning
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
import time
import plpy
version_wrapper = __mad_version()
string_to_array = version_wrapper.select_vecfunc()
array_to_string = version_wrapper.select_vec_return()
def pca_sparse(schema_madlib, source_table, pc_table, row_id,
col_id, val_id, row_dim, col_dim, k, grouping_cols,
lanczos_iter, use_correlation, result_summary_table,
variance, **kwargs):
"""
Args:
@param schema_madlib
@param source_table
@param pc_table
@param row_id
@param col_id
@param val_id
@param row_dim
@param col_dim
@param k
@param grouping_cols
@param lanczos_iter
@param use_correlation
@param result_summary_table
@param variance
Returns:
None
"""
pca_wrap(schema_madlib, source_table, pc_table, row_id,
k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table, variance, True, col_id,
val_id, row_dim, col_dim)
# ------------------------------------------------------------------------
def pca(schema_madlib, source_table, pc_table, row_id,
k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table, variance, **kwargs):
"""
Args:
@param schema_madlib
@param source_table
@param pc_table
@param row_id
@param k
@param grouping_cols
@param lanczos_iter
@param use_correlation
@param result_summary_table
@param variance
Returns:
None
"""
pca_wrap(schema_madlib, source_table, pc_table, row_id,
k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table, variance)
# ------------------------------------------------------------------------
def pca_wrap(schema_madlib, source_table, pc_table, row_id,
k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table, variance, is_sparse=False, col_id=None,
val_id=None, row_dim=None, col_dim=None, **kwargs):
"""
This wrapper was added to support grouping columns. This
function does the necessary pre-processing for handling
grouping_cols, if set. It then constructs a single query
that includes a separate "madlib._pca_union(...)" for each
group.
"""
# Reset the message level to avoid random messages
with MinWarning('warning'):
grouping_cols_list = []
if is_sparse:
_validate_args(schema_madlib, source_table, pc_table, k, row_id, col_id,
val_id, row_dim, col_dim, lanczos_iter,
use_correlation, result_summary_table, variance)
else:
_validate_args(schema_madlib, source_table, pc_table, k,
row_id, None, None, None, None,
lanczos_iter, use_correlation,
result_summary_table, variance)
if grouping_cols:
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the source_table, and not expressions!
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
_assert(columns_exist_in_table(source_table, grouping_cols_list, schema_madlib),
"PCA error: One or more grouping columns in {0} do not exist!".
format(grouping_cols))
distinct_grouping_values = plpy.execute(
"SELECT DISTINCT {grouping_cols} FROM {source_table}".
format(grouping_cols=grouping_cols, source_table=source_table))
else:
grouping_cols = ''
other_columns_in_table = [col for col in get_cols(source_table) if col not in grouping_cols_list]
grouping_cols_clause = ''
if grouping_cols_list:
cols_names_types = get_cols_and_types(source_table)
grouping_cols_clause = ', ' + ', '.join([c_name + " " + c_type for (c_name, c_type) in cols_names_types if c_name in grouping_cols_list])
# Create all output tables
plpy.execute("""
CREATE TABLE {pc_table} (
row_id INTEGER,
principal_components double precision[],
std_dev double precision,
proportion double precision
{grouping_cols_clause}
)
""".format(pc_table=pc_table,
grouping_cols_clause=grouping_cols_clause))
pc_table_mean = add_postfix(pc_table, "_mean")
plpy.execute("DROP TABLE IF EXISTS {0}".format(pc_table_mean))
plpy.execute("""
CREATE TABLE {pc_table_mean} (
column_mean double precision[]
{grouping_cols_clause}
)
""".format(pc_table_mean=pc_table_mean, grouping_cols_clause=grouping_cols_clause))
if result_summary_table:
plpy.execute("DROP TABLE IF EXISTS {0}".format(result_summary_table))
plpy.execute("""
CREATE TABLE {0} (
rows_used INTEGER,
"exec_time (ms)" numeric,
iter INTEGER,
recon_error double precision,
relative_recon_error double precision,
use_correlation boolean
{1}
)
""".format(result_summary_table, grouping_cols_clause))
else:
result_summary_table = ''
# declare variables whose values will be different for each group, if
# grouping_cols is specified
grouping_where_clause = ''
sparse_where_condition = ''
select_grouping_cols = ''
temp_table_columns = ''
result_summary_table_temp = ''
# For Dense matrix format only:
# We can now ignore the original row_id for all computations since we will
# create a new table with a row_id column that has not duplicates and ranges
# from 1 to number of rows in the group/table. This is to mainly support the
# grouping scneario where the row_id values might not range between 1 and
# number of rows in the group, for each group. Doing this also just extends
# this behavior for non-grouping scenarios too. If creating a new temp table
# that corrects the row_id column is not of much importance in non-grouping
# cases, we can avoid creating the temp table and save some computation time.
# But, at the moment, the code creates the temp table even for the non-grouping
# scenario.
# We don't need to do this for sparse representation because of the nature
# of its definition.
other_columns_in_table.remove(row_id)
temp_table_columns = (" ROW_NUMBER() OVER() AS row_id, " +
','.join(other_columns_in_table))
pca_union_call_list = []
grp_id = 0
if not is_sparse:
col_id = 'NULL'
val_id = 'NULL'
row_dim = 0
col_dim = 0
while True:
if result_summary_table:
result_summary_table_temp = ("pg_temp.{0}_{1}".
format(unique_string(), grp_id))
if grouping_cols:
grp_value_dict = distinct_grouping_values[grp_id]
where_conditions = ' AND '.join(
["{0} = '{1}'".format(key, value)
for key, value in grp_value_dict.items()])
sparse_where_condition = ' AND ' + where_conditions
grouping_where_clause = ' WHERE ' + where_conditions
select_grouping_cols = (
', ' + ', '.join(["'{1}' AS {0}".format(key, value)
for key, value in grp_value_dict.items()]))
pca_union_call_list.append("""
{schema_madlib}._pca_union(
'{source_table}',
'{pc_table}',
'{pc_table_mean}',
'{row_id}',
{k_null},
'{grouping_cols}',
{lanczos_iter},
{use_correlation},
'{result_summary_table}',
'{result_summary_table_temp}',
{variance_null},
{grp_id},
$${grouping_where_clause}$$,
$${sparse_where_condition}$$,
$${select_grouping_cols}$$,
'{temp_table_columns}',
{is_sparse},
'{col_id}',
'{val_id}',
{row_dim},
{col_dim})
""".format(k_null='NULL' if k is None else k,
variance_null='NULL' if variance is None else variance,
**locals()))
grp_id += 1
if not grouping_cols_list or len(distinct_grouping_values) == grp_id:
break
# "SELECT <query_1>, <query_2>, <query_3>, ..." is expected to run each
# <query_i> in parallel.
pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list)
plpy.execute(pca_union_call)
def _pca_union(schema_madlib, source_table, pc_table, pc_table_mean,
row_id, k, grouping_cols, lanczos_iter, use_correlation,
result_summary_table, result_summary_table_temp, variance,
grp_id, grouping_where_clause, sparse_where_condition,
select_grouping_cols, temp_table_columns, is_sparse, col_id,
val_id, row_dim, col_dim, **kwargs):
"""
This function does all the heavy lifting of PCA, for both pca and pca_sparse.
Compute the PCA of the matrix in source_table. This function is the specific
call for dense matrices and creates three tables corresponding to the three
decomposition matrices.
Args:
@param source_table TEXT, -- Source table name (dense matrix)
@param pc_table TEXT, -- Output table name for the principal components
@param pc_table_mean TEXT, -- Output table name for the principal components
@param row_id TEXT, -- Column name for the ID for each row
@param k INTEGER, -- Number of principal components to compute
@param grouping_cols TEXT, -- Comma-separated list of grouping columns (Default: NULL)
@param lanczos_iter INTEGER, -- The number of Lanczos iterations for the SVD calculation (Default: min(k+40, smallest Matrix dimension))
@param use_correlation BOOLEAN, -- If True correlation matrix is used for principal components (Default: False)
@param result_summary_table TEXT, -- Table name to store summary of results (Default: NULL)
@param result_summary_table_temp TEXT, -- Table name to store summary of results (Default: NULL)
@param variance DOUBLE PRECISION, -- The proportion of variance (Default: NULL)
@param grp_id INTEGER, -- a place holder id for each group
@param grouping_where_clause TEXT, -- WHERE clause using grouping_cols
@param select_grouping_cols TEXT, -- SELECT clause using grouping_cols
@param temp_table_columns TEXT, -- SELECT caluse for creating temporary copy of the source_table
@param is_sparse BOOLEAN, -- specifies if the PCA call is for sparse or dense matrices
@param col_id TEXT, -- sparse representation based detail
@param val_id TEXT, -- sparse representation based detail
@param row_dim INTEGER, -- sparse representation based detail
@param col_dim INTEGER -- sparse representation based detail
Returns:
None
"""
startTime = time.time() # measure the starting time
# Step 1: Modify data format for sparse input
if is_sparse:
# Step 1.1: Densify the matrix for sparse input tables
# We densify the matrix because the recentering process will generate a
# dense matrix, so we just wrap around regular PCA.
# First we must copy the sparse matrix and add in the dimension information
sparse_temp = "pg_temp." + unique_string() + "_sparse"
# Add in the dimension information needed by the densifying process
create_temp_sparse_matrix_table_with_dims(source_table, sparse_temp,
row_id, col_id, val_id,
row_dim, col_dim, sparse_where_condition)
validate_sparse(sparse_temp,
{'row': row_id, 'col': col_id, 'val': val_id},
check_col=False)
# Step 1.2: Densify the input matrix
x_dense = "pg_temp." + unique_string() + "_dense"
plpy.execute("""
SELECT {schema_madlib}.matrix_densify(
'{sparse_temp}',
'row={row_id}, col={col_id}, val={val_id}',
'{x_dense}', 'row=row_id, val=row_vec')
""".format(schema_madlib=schema_madlib,
sparse_temp=sparse_temp, row_id=row_id,
col_id=col_id, val_id=val_id, x_dense=x_dense))
plpy.execute("""
DROP TABLE IF EXISTS {0};
""".format(sparse_temp))
source_table_grouped = x_dense
else:
# Creation of this temp table is unnecessary if the scenario does not involve
# grouping, and/or, the input table had perfect values for the row_id column.
# This temp table will ensure pca works even when the value of row_id column
# in dense matrix format does not have values ranging from 1 to number of rows.
source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id)
plpy.execute("""
CREATE TABLE {source_table_grouped} AS
SELECT {temp_table_columns}
FROM {source_table}
{grouping_where_clause}
""".format(source_table_grouped=source_table_grouped,
source_table=source_table,
grouping_where_clause=grouping_where_clause,
temp_table_columns=temp_table_columns))
row_id = 'row_id'
# Make sure that the table has row_id and row_vec
source_table_copy = "pg_temp." + unique_string() + "_reformated_names"
created_new_table = cast_dense_input_table_to_correct_columns(
schema_madlib, source_table_grouped, source_table_copy, row_id)
if(created_new_table):
plpy.execute("DROP TABLE {0}".format(source_table_grouped))
source_table_grouped = source_table_copy
[row_dim, col_dim] = get_dims(source_table_grouped,
{'row': 'row_id', 'col': 'col_id',
'val': 'row_vec'})
validate_dense(source_table_grouped,
{'row': 'row_id', 'val': 'row_vec'},
check_col=False, row_dim=row_dim)
if k:
if k > min(row_dim, col_dim):
plpy.error(
"PCA error: k cannot be larger than min(row_dim, col_dim)!")
curK = k
else:
curK = min(row_dim, col_dim)
# If using the default number of lanczos iterations, set to the default
if lanczos_iter == 0:
if k:
lanczos_iter = min(k + 40, min(col_dim, row_dim))
else:
lanczos_iter = min(col_dim, row_dim)
else:
if variance: # lanczos_iter overrides the proportion default for k
curK = lanczos_iter
# Note: we currently don't support grouping columns or correlation matrices
if not use_correlation:
# Step 2: Normalize the data (Column means)
dimension = col_dim
scaled_source_table = "pg_temp." + unique_string() + "_scaled_table"
column_mean_str = _recenter_data(schema_madlib,
source_table_grouped,
scaled_source_table,
'row_id',
'row_vec',
dimension)
# Step 3: Create temporary output & result summary table
svd_output_temp_table = "pg_temp." + unique_string() + "_svd_out_tbl"
if result_summary_table_temp is None:
result_summary_table_string = ''
else:
result_summary_table_string = ", '{0}'".format(result_summary_table_temp)
# Step 4: Perform SVD
# Step 4.1: Perform upper part of SVD
if result_summary_table_temp:
t0 = time.time()
(source_table_svd, bd_pref) = _svd_upper_wrap(
schema_madlib, scaled_source_table, svd_output_temp_table,
row_id, curK, lanczos_iter, result_summary_table_temp)
# Calculate the sum of values for proportion
svd_var_s = add_postfix(svd_output_temp_table, "_s")
eigen_sum = plpy.execute(
"""
SELECT {schema_madlib}.array_sum(
{schema_madlib}.sum(
{schema_madlib}.array_square(row_vec)
)
)
FROM {scaled_source_table}
""".format(**locals()))[0]['array_sum']
# Step 4.2: Adjust the k value
if variance:
variance_tmp_table = "pg_temp." + unique_string() + "_var_tmp"
plpy.execute(
"""
CREATE TABLE {variance_tmp_table} AS (
SELECT row_id,
col_id,
sum(value*value) OVER(ORDER BY value DESC)
AS sum_value_square
FROM {svd_var_s})
""".format(variance_tmp_table=variance_tmp_table,
svd_var_s=svd_var_s))
ecount = plpy.execute("""
SELECT count(sum_value_square)
FROM {variance_tmp_table}
WHERE sum_value_square / {eigen_sum} < {variance}
""".format(variance_tmp_table=variance_tmp_table,
eigen_sum=eigen_sum,
variance=variance))
curK = ecount[0]['count']
# The next value is the first that is over the treshold
if (lanczos_iter > curK):
curK = curK + 1
plpy.execute("""
DROP TABLE IF EXISTS {variance_tmp_table}
""".format(variance_tmp_table=variance_tmp_table))
# Step 4.3: Perform the lower part of SVD
tmp_matrix_table = "temp_" + unique_string() + "_matrix"
tmp_matrix_s_table = add_postfix(tmp_matrix_table, "_s")
if variance: # remove elements after the variance threshold
plpy.execute("""
DELETE FROM {svd_var_s}
WHERE row_id >{curK};
ALTER TABLE {svd_var_s} RENAME TO {tmp_matrix_s_table};
""".format(**locals()))
_svd_lower_wrap(schema_madlib, source_table_svd,
svd_output_temp_table, row_id, curK, lanczos_iter, bd_pref,
tmp_matrix_s_table)
else:
tmp_matrix_table = svd_output_temp_table
_svd_lower_wrap(schema_madlib, source_table_svd,
svd_output_temp_table, row_id, curK, lanczos_iter, bd_pref)
# Step 4.4: Create the SVD result table
if result_summary_table_temp:
t1 = time.time()
[row_dim, col_dim] = get_dims(source_table_grouped,
{'row': 'row_id', 'col': 'col_id', 'val': 'row_vec'})
arguments = {'schema_madlib': schema_madlib,
'source_table': scaled_source_table,
'matrix_u': add_postfix(svd_output_temp_table, "_u"),
'matrix_v': add_postfix(svd_output_temp_table, "_v"),
'matrix_s': add_postfix(tmp_matrix_table, "_s"),
'row_dim': row_dim,
'col_dim': col_dim,
'result_summary_table': result_summary_table_temp,
'temp_prefix': "pg_temp." + unique_string(),
't0': t0, 't1': t1}
create_summary_table(**arguments)
# Step 5: Transpose SVD output matrix
svd_v_transpose = "pg_temp." + unique_string() + "transpose"
svd_output_temp_table_s = add_postfix(tmp_matrix_table, "_s")
svd_output_temp_table_v = add_postfix(svd_output_temp_table, "_v")
svd_output_temp_table_u = add_postfix(svd_output_temp_table, "_u")
plpy.execute(
"""
SELECT {schema_madlib}.matrix_trans(
'{svd_output_temp_table_v}',
'row=row_id, col=col_id, val=row_vec',
'{svd_v_transpose}', 'row=row_id, col=col_id, val=row_vec');
""".format(svd_output_temp_table_v=svd_output_temp_table_v,
svd_v_transpose=svd_v_transpose,
schema_madlib=schema_madlib)
)
# Step 6: Insert the output of SVD into the PCA table
plpy.execute(
"""
INSERT INTO {pc_table}
SELECT {svd_v_transpose}.row_id,
row_vec AS principal_components,
value / sqrt({row_dim} - 1) AS std_dev,
((value*value)/ {eigen_sum}) AS proportion
{select_grouping_cols}
FROM {svd_v_transpose},
{svd_output_temp_table_s}
WHERE ({svd_v_transpose}.row_id = {svd_output_temp_table_s}.row_id)
AND ({svd_v_transpose}.row_id <= {k})
AND value is not NULL
""".format(svd_output_temp_table_s=svd_output_temp_table_s,
k=curK,
svd_v_transpose=svd_v_transpose,
pc_table=pc_table,
row_dim=row_dim,
eigen_sum=eigen_sum,
select_grouping_cols=select_grouping_cols))
# Output the column mean
plpy.execute(
"""
INSERT INTO {pc_table_mean}
SELECT '{column_mean_str}'::FLOAT8[] AS column_mean
{select_grouping_cols}
""".format(pc_table_mean=pc_table_mean,
column_mean_str=column_mean_str,
select_grouping_cols=select_grouping_cols))
# Step 7: Append to the SVD summary table to get the PCA summary table
if result_summary_table_temp:
stopTime = time.time()
dt = (stopTime - startTime) * 1000.
plpy.execute(
"""
INSERT INTO {result_summary_table}
SELECT
rows_used,
{dt} AS "exec_time (ms)",
{iter} AS iter,
recon_error,
relative_recon_error,
{use_correlation} AS use_correlation
{select_grouping_cols}
FROM {result_summary_table_temp};
""".format(result_summary_table=result_summary_table,
dt=str(dt), iter=curK,
use_correlation=bool(use_correlation),
result_summary_table_temp=result_summary_table_temp,
select_grouping_cols=select_grouping_cols))
plpy.execute("DROP TABLE {result_summary_table_temp};".format(
result_summary_table_temp=result_summary_table_temp))
# Step 8: Output handling & cleanup
plpy.execute(
"""
DROP TABLE IF EXISTS {tmp_matrix_s_table};
DROP TABLE IF EXISTS {svd_v_transpose};
DROP TABLE IF EXISTS {source_table_copy};
DROP TABLE IF EXISTS {svd_output_temp_table};
DROP TABLE IF EXISTS {svd_output_temp_table_s};
DROP TABLE IF EXISTS {svd_output_temp_table_u};
DROP TABLE IF EXISTS {svd_output_temp_table_v};
DROP TABLE IF EXISTS {scaled_source_table};
DROP TABLE IF EXISTS {source_table_grouped};
""".format(svd_output_temp_table=svd_output_temp_table,
svd_output_temp_table_s=svd_output_temp_table_s,
svd_output_temp_table_u=svd_output_temp_table_u,
svd_output_temp_table_v=svd_output_temp_table_v,
scaled_source_table=scaled_source_table,
svd_v_transpose=svd_v_transpose,
source_table_copy=source_table_copy,
tmp_matrix_s_table=tmp_matrix_s_table,
source_table_grouped=source_table_grouped))
# ------------------------------------------------------------------------
# ------------------------------------------------------------------------
# Validate arguments: Same as pca
# ------------------------------------------------------------------------
def _validate_args(schema_madlib,
source_table,
pc_table,
k,
row_id,
col_id=None,
val_id=None,
row_dim=None,
col_dim=None,
lanczos_iter=0,
use_correlation=False,
result_summary_table=None,
variance=None):
"""
Validates all arguments passed to the PCA function
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param source_table Name of the source table
@param output_table Name of the output table
@param k Number of singular vectors to return
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val_id Name of the val_id column
@param grouping_cols The columns that the data should be grouped by
@param lanczos_iter The number of lanczos iterations to use in the SVD calculation
@param use_correlation If the correlation matrix should be used instead of the covariance matrix
@param result_summary_table Name of summary table
@param variance Proportion of variance
Returns:
None
Throws:
plpy.error if any argument is invalid
"""
_assert(source_table is not None and table_exists(source_table),
"PCA error: Source data table {0} does not exist!".
format(str(source_table)))
if not k and not variance:
plpy.error("""PCA error: components_param must be either
a positive integer or a float in the range (0.0,1.0]!""")
if k:
if k <= 0:
plpy.error("""PCA error: components_param must be either
a positive integer or a float in the range (0.0,1.0]!""")
if variance:
if (variance <= 0) or (variance > 1):
plpy.error("""PCA error: components_param must be either
a positive integer or a float in the range (0.0,1.0]!""")
# confirm output tables are valid
if pc_table:
_assert(not table_exists(pc_table, only_first_schema=True) and
not table_exists(pc_table + '_mean', only_first_schema=True),
"PCA error: Output table {pc_table}/{pc_table}_mean "
"already exist!".format(pc_table=pc_table))
else:
plpy.error("PCA error: Invalid output table prefix!")
_assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, "NULL" if row_id is None else row_id))
if (lanczos_iter < 0):
plpy.error("PCA error: lanczos_iter can't be negative! (Use zero for \
default value) The provided value is {0}".format(str(lanczos_iter)))
# If using sparse matrices, check that the parameters are reasonable
if col_id or val_id:
if not col_id:
plpy.error("PCA error: Column ID should be provided if \
value ID is input!")
if not val_id:
plpy.error("PCA error: Value ID should be provided if \
column ID is input!")
_assert(columns_exist_in_table(source_table, [col_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, col_id))
_assert(columns_exist_in_table(source_table, [val_id], schema_madlib),
"PCA error: {1} column does not exist in {0}!".
format(source_table, val_id))
if not col_dim:
plpy.error("PCA error: Column dimensions should be provided if \
using sparse matrices!")
if not row_dim:
plpy.error("PCA error: Row dimensions should be provided if \
using sparse matrices!")
if row_dim <= 0:
plpy.error("PCA error: The row dimension must be larger than 0!")
if col_dim <= 0:
plpy.error("PCA error: The column dimension must be larger than 0!")
if use_correlation:
plpy.error("PCA error: Using the correlation matrix is not enabled! \
This value must be set to FALSE")
if result_summary_table:
if not result_summary_table.strip():
plpy.error("PCA error: Invalid result summary table name!")
_assert(not table_exists(result_summary_table, only_first_schema=True),
"PCA error: Result summary table {0} \
already exists!".format(result_summary_table))
# ========================================================================
def _recenter_data(schema_madlib, source_table, output_table, row_id,
col_name, dimension):
"""
Rescales the data table by column means.
The output is stored in output_table. The input table should have a
array column that contains the data
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param source_table Name of the source table
@param output_table Name of the output table
@param row_id Name of the row_id column
@param col_name Name of the array column from input table
@param dimension
Returns:
Column mean
"""
# Step 1: Compute column mean values
x_scales = utils_ind_var_scales(tbl_data=source_table,
col_ind_var=col_name,
dimension=dimension,
schema_madlib=schema_madlib)
x_mean_str = _array_to_string(x_scales["mean"])
x_std_str = _array_to_string([1] * dimension)
# Step 2: Rescale the matrices
plpy.execute(
"""
CREATE TABLE {output_table}
m4_ifdef(`__POSTGRESQL__', `', `WITH (appendonly=true)')
AS
SELECT
{row_id} as row_id,
({schema_madlib}.utils_normalize_data(
{col_name},
'{x_mean_str}'::double precision[],
'{x_std_str}'::double precision[]))
AS row_vec
FROM {source_table}
""".format(schema_madlib=schema_madlib,
col_name=col_name,
row_id=row_id,
source_table=source_table,
output_table=output_table,
x_mean_str=x_mean_str,
x_std_str=x_std_str))
return x_mean_str
# ------------------------------------------------------------------------
# Sparse PCA train help function
def pca_sparse_help_message(schema_madlib, message=None, **kwargs):
"""
Given a help string, provide usage information
Args:
@param schema_madlib Name of the MADlib schema
@param message Helper message to print
Returns:
None
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.pca_sparse_train(
source_table -- TEXT, Name of data table
pc_table -- TEXT, Name of the table containing the principal components
row_id -- TEXT, Column name for the row coordinates.
col_id -- TEXT, Column name for the column coordinates.
val_id -- TEXT, Column name for the sparse values.
row_dim, -- INTEGER, The number of rows in the sparse matrix
col_dim, -- INTEGER, The number of columns in the sparse matrix
components_param -- INTEGER OR FLOAT, The parameter to control the number of
principal components to calculate from the input data.
grouping_cols -- TEXT, Comma-separated list of grouping columns
(Default: NULL)
lanczos_iter -- INTEGER, The number of Lanczos iterations to use in the SVD calculation
(Default: minimum of of the smallest input
matrix dimension and k+40)
use_correlation -- BOOLEAN, If True correlation matrix is used for principal components
(Default: False)
rslt_summary_table -- TEXT, Table name to store summary of results
(Default: NULL)
);
If components_param is INTEGER it is used for denoting the number of principal components to compute.
If components_param is FLOAT it is used as the target proportion of variance.
-------------------------------------------------------------------------
OUTPUT TABLES
-------------------------------------------------------------------------
A PCA model is created for each group, if grouping_cols is specified.
The output table ("pc_table" above) has the following columns:
row_id -- INTEGER, The ranking of the eigenvalues
prin_comp -- FLOAT[], The principal components
eigen_values -- FLOAT[] The eigenvalues associated with each principal component
grouping_cols -- The grouping columns (with their types), if any,
specified in grouping_cols
A secondary output table named "pc_table"_mean is also generated.
This table has only the single column:
column_mean -- FLOAT[], The column means of the input data
-------------------------------------------------------------------------
RESULT SUMMARY TABLE
-------------------------------------------------------------------------
The result summary table ("rslt_summary_table" above) has the following columns
rows_used -- INTEGER, Number of rows used in the PCA calculation
exec_time -- FLOAT, Number of milliseconds the PCA calculation took
use_correlation -- BOOLEAN, Value of parameter use_correlation
iter -- INTEGER, Number of iterations the SVD took to converge
recon_error -- FLOAT, Absolute error in the approximation
relative_recon_error -- FLOAT Relative error in the approximation
grouping_cols -- The grouping columns (with their types), if any,
specified in grouping_cols
""".format(schema_madlib=schema_madlib)
else:
return """
----------------------------------------------------------------
Summary: Sparse PCA Training
----------------------------------------------------------------
Principal component analysis (PCA) is a mathematical procedure that uses an
orthogonal transformation to convert a set of observations of possibly
correlated variables into a set of values of linearly uncorrelated variables
called principal components. This transformation is defined in such a way that
the first principal component has the largest possible variance (i.e.,
accounts for as much of the variability in the data as possible), and each
succeeding component in turn has the highest variance possible under the
constraint that it be orthogonal to (i.e., uncorrelated with) the preceding
components.
--
For an overview on usage, run:
SELECT {schema_madlib}.pca_sparse_train('usage');
--
""".format(schema_madlib=schema_madlib)
def pca_help_message(schema_madlib, message=None, **kwargs):
"""
Given a help string, provide usage information
Args:
@param schema_madlib Name of the MADlib schema
@param message Helper message to print
Returns:
None
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.pca_train(
source_table -- TEXT, Name of data table
pc_table -- TEXT, Name of the table containing the principal components
row_id -- TEXT, Column name for the row coordinates.
components_param -- INTEGER OR FLOAT, The parameter to control the number of
principal components to calculate from
the input data.
grouping_cols -- TEXT, Comma-separated list of grouping column names
(Default: NULL)
lanczos_iter -- INTEGER, The number of Lanczos iterations to use in the SVD calculation
(Default: minimum of of the smallest input
matrix dimension and k+40)
use_correlation -- BOOLEAN, If True correlation matrix is used for principal components
(Default: False)
rslt_summary_table -- TEXT, Table name to store summary of results
(Default: NULL)
variance -- DOUBLE PRECISION, Proportion of variance
(Default: NULL)
);
If components_param is INTEGER it is used for denoting the number of
principal components to compute. If components_param is FLOAT it is used
as the target proportion of variance.
-------------------------------------------------------------------------
OUTPUT TABLES
-------------------------------------------------------------------------
A PCA model is created for each group, if grouping_cols is specified.
The output table ("pc_table" above) has the following columns:
row_id -- INTEGER, The ranking of the eigenvalues
prin_comp -- FLOAT[], The principal components
eigen_values -- FLOAT[], The eigenvalues associated with each
principal component
grouping_cols -- The grouping columns (with their types), if any,
specified in grouping_cols
A secondary output table named "pc_table"_mean is also generated.
This table has only the single column:
column_mean -- FLOAT[], The column means of the input data
grouping_cols -- The grouping columns (with their types), if any,
specified in grouping_cols
-------------------------------------------------------------------------
RESULT SUMMARY TABLE
-------------------------------------------------------------------------
The result summary table ("rslt_summary_table" above) has the following columns
rows_used -- INTEGER, Number of rows used in the PCA calculation
exec_time -- FLOAT, Number of milliseconds the PCA calculation took
use_correlation -- BOOLEAN, Value of parameter use_correlation
iter -- INTEGER, Number of iterations the SVD took to converge
recon_error -- FLOAT, Absolute error in the approximation
relative_recon_error -- FLOAT Relative error in the approximation
grouping_cols -- The grouping columns (with their types), if any,
specified in grouping_cols
""".format(schema_madlib=schema_madlib)
else:
return """
----------------------------------------------------------------
Summary: PCA Training
----------------------------------------------------------------
Principal component analysis (PCA) is a mathematical procedure that uses an
orthogonal transformation to convert a set of observations of possibly
correlated variables into a set of values of linearly uncorrelated variables
called principal components. This transformation is defined in such a way that
the first principal component has the largest possible variance (i.e.,
accounts for as much of the variability in the data as possible), and each
succeeding component in turn has the highest variance possible under the
constraint that it be orthogonal to (i.e., uncorrelated with) the preceding
components.
--
For an overview on usage, run:
SELECT {schema_madlib}.pca_train('usage');
--
""".format(schema_madlib=schema_madlib)