blob: bf0e5222227fc86106da1c902f8f54546973bb50 [file] [log] [blame]
"""
@file svd.py_in
@namespace linalg
"""
import plpy
from utilities.utilities import __mad_version
from utilities.utilities import unique_string
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
from utilities.utilities import rename_table
from matrix_ops import get_dims
from matrix_ops import validate_sparse
from matrix_ops import validate_dense
from matrix_ops import create_temp_sparse_matrix_table_with_dims
from matrix_ops import cast_dense_input_table_to_correct_columns
import time
import math
version_wrapper = __mad_version()
string_to_array = version_wrapper.select_vecfunc()
array_to_string = version_wrapper.select_vec_return()
# To get a rank-k approximation of the original matrix if we perform k + s
# Lanczos bidiagonalization steps followed by the SVD of a small matrix B(k+s)
# then the algorithm constructs the best rank-k subspace in an extended subspace
# Span[U(k+s)]. Hence we obtain a better rank-k approximation than the one
# obtained after k steps steps of the standard Lanczos bidiagonalization
# algorithm. There is a memory limit to the number of extended steps and we
# restrict that to a fixed number of steps for now. Magic number computed using
# the 1GB memory limit. There are 3 matrices of size k x k placed in memory
# Hence MAX_LANCZOS_STEPS^2 * 3 < 10^9 bytes / 8 bytes
MAX_LANCZOS_STEPS = 5000 # conservative number
# global variable for actual number of Lanczos iterations
# value obtained optionally from user
actual_lanczos_iterations = 0
# ------------------------------------------------------------------------
def svd(schema_madlib, source_table, output_table_prefix,
row_id, k, nIterations, result_summary_table=None):
"""
Compute the Singular Value Decomposition of the matrix in source_table.
This function is the specific call for dense matrices and creates three
tables corresponding to the three decomposition matrices.
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
(in sparse or dense format)
@param output_table_prefix Prefix string for the output table names
@param row_id Name of the row_id column
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param result_summary_table Name of the table to store summary of results
Returns:
None
"""
if result_summary_table:
t0 = time.time() # measure the starting time
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
_validate_args(schema_madlib, source_table, output_table_prefix,
k, row_id, nIterations, col_id=None, val_id=None,
result_summary_table=result_summary_table)
# Make sure that the input table has row_id and row_vec
# if not then make a copy of the source table and change column names
source_table_copy = "pg_temp." + unique_string() + "_2"
need_new_column_names = cast_dense_input_table_to_correct_columns(
schema_madlib, source_table, source_table_copy, row_id)
if(need_new_column_names):
source_table = source_table_copy
(new_source_table, bd_pref) = _svd_upper_wrap(schema_madlib, source_table,
output_table_prefix, row_id, k, nIterations, result_summary_table)
_svd_lower_wrap(schema_madlib, new_source_table, output_table_prefix,
row_id, k, nIterations, bd_pref)
if result_summary_table:
t1 = time.time()
[row_dim, col_dim] = get_dims(source_table,
{'row': 'row_id', 'col': 'col_id', 'val': 'row_vec'})
arguments = {'schema_madlib': schema_madlib,
'source_table': source_table,
'matrix_u': add_postfix(output_table_prefix, "_u"),
'matrix_v': add_postfix(output_table_prefix, "_v"),
'matrix_s': add_postfix(output_table_prefix, "_s"),
'row_dim': row_dim,
'col_dim': col_dim,
'result_summary_table': result_summary_table,
'temp_prefix': "pg_temp." + unique_string(),
't0': t0, 't1': t1}
create_summary_table(**arguments)
plpy.execute("DROP TABLE if exists {0}".format(source_table_copy))
plpy.execute("set client_min_messages to " + old_msg_level)
# ------------------------------------------------------------------------
def _validate_args(schema_madlib, source_table, output_table_prefix, k,
row_id, nIterations, col_id=None, val_id=None,
result_summary_table=None, is_block=False):
"""
Validates all arguments passed to the SVD function
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param source_table Name of the source table
@param output_table Name of the output table
@param k Number of singular vectors to return
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val_id Name of the val_id column
@param result_summary_table Name of summary table
Returns:
None
Throws:
plpy.error if any argument is invalid
"""
_assert(source_table is not None and table_exists(source_table),
"SVD error: Source data table does not exist!")
if k <= 0:
plpy.error("SVD error: k must be a positive integer!")
# output tables are created from the prefix by appending _u, _s, and _v as
# suffixes
if output_table_prefix:
output_table_names = [add_postfix(output_table_prefix, i)
for i in ('_u', '_s', '_v')]
for each_table in output_table_names:
_assert(not table_exists(each_table, only_first_schema=True),
"SVD error: Output table {0}"
" already exists!".format(str(each_table)))
else:
plpy.error("SVD error: Invalid output table prefix!")
if not is_block:
_assert(columns_exist_in_table(source_table, [row_id], schema_madlib),
"SVD error: {1} column does not exist in {0}!"
.format(source_table, "NULL" if row_id is None else row_id))
if col_id or val_id:
# sparse matrix format
if not col_id:
plpy.error("SVD error: column ID should be provided if"
" value ID is input!")
if not val_id:
plpy.error("SVD error: value ID should be provided if"
" column ID is input!")
_assert(columns_exist_in_table(source_table, [col_id], schema_madlib),
"SVD error: {1} column does not exist in {0}!"
.format(source_table, col_id))
_assert(columns_exist_in_table(source_table, [val_id], schema_madlib),
"SVD error: {1} column does not exist in {0}!"
.format(source_table, val_id))
validate_sparse(source_table,
{'row': row_id, 'col': col_id, 'val': val_id},
check_col=False)
if result_summary_table is not None:
if not result_summary_table.strip():
plpy.error("SVD error: Invalid result summary table name!")
_assert(not table_exists(result_summary_table, only_first_schema=True),
"SVD error: Result summary table {0}"
" already exists!".format(result_summary_table))
# ------------------------------------------------------------------------
def create_summary_table(**args):
""" Summarize results from SVD in a summary table. """
global actual_lanczos_iterations
dt = (args['t1'] - args['t0']) * 1000.
plpy.execute(
"""
SELECT {schema_madlib}.matrix_mult(
'{matrix_u}', 'row=row_id, col=col_id, val=row_vec',
'{matrix_s}', 'row=row_id, col=col_id, val=value',
'{temp_prefix}_a', 'row=row_id, col=col_id, val=row_vec'
);
""".format(**args))
plpy.execute(
"""
SELECT {schema_madlib}.matrix_mult(
'{temp_prefix}_a', 'row=row_id, col=col_id, val=row_vec, trans=false',
'{matrix_v}', 'row=row_id, col=col_id, val=row_vec, trans=True',
'{temp_prefix}_b', 'row=row_id, col=col_id, val=row_vec'
);
""".format(**args))
err = plpy.execute(
"""
SELECT avg(s) / ({col_dim}::float8) as p
FROM (
SELECT {schema_madlib}.array_dot(arr::float8[], arr::float8[]) as s
FROM (
SELECT {schema_madlib}.array_sub(a.row_vec::float8[],
b.row_vec::float8[]) as arr
FROM
{source_table} a
join
{temp_prefix}_b b
on (a.row_id::int4 = b.row_id::int4)
) Q1
) Q2
""".format(**args))[0]['p']
norm = plpy.execute(
"""
SELECT avg(s) / ({col_dim}::float8) as p
FROM (
SELECT
{schema_madlib}.array_dot(row_vec::float8[], row_vec::float8[]) as s
FROM
{source_table}
) Q1
""".format(**args))[0]['p']
relative_err = err / norm
plpy.execute(
"""
CREATE TABLE {result_summary_table} (rows_used INTEGER,
\"exec_time (ms)\" double precision,
iter INTEGER,
recon_error FLOAT8,
relative_recon_error FLOAT8);
""".format(result_summary_table=args['result_summary_table']))
plpy.execute(
"""
INSERT INTO {result_summary_table} VALUES
({row_dim}, '{dt:.2f}', {nIterations}, {err}::double precision,
{relative_err}::double precision);
""".format(dt=dt,
nIterations=actual_lanczos_iterations,
err=math.sqrt(err), relative_err=math.sqrt(relative_err), **args))
plpy.execute(
"""
DROP TABLE IF EXISTS {temp_prefix}_a;
DROP TABLE IF EXISTS {temp_prefix}_b;
""".format(**args))
# ------------------------------------------------------------------------
def svd_block(schema_madlib, source_table, output_table_prefix,
k, nIterations, result_summary_table=None):
""" Compute SVD of matrix using block representation """
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
_validate_args(schema_madlib, source_table, output_table_prefix,
k, row_id=None, nIterations=nIterations,
col_id=None, val_id=None,
result_summary_table=result_summary_table,
is_block=True)
[row_dim, col_dim] = get_dims(source_table,
{'row': 'row_id', 'col': 'col_id',
'val': 'block'},
is_block=True)
if k > min(row_dim, col_dim):
plpy.error("SVD error: k cannot be larger than min(row_dim, col_dim)!")
t0 = time.time() # measure the starting time
# _svd expects a narrow dense matrix(i.e. row_dim >= col_dim)
# If the input matrix is a wide dense matrix(i.e. row_dim < col_dim),
# We need to transpose it into a narrow one
if col_dim > row_dim:
x_trans = "pg_temp." + unique_string() + "_3"
plpy.execute("""
SELECT {schema_madlib}.matrix_block_trans(
'{source_table}', 'row=row_id, col=col_id, val=block',
'{x_trans}', 'row=row_id, col=col_id, val=block')
""".format(schema_madlib=schema_madlib,
source_table=source_table, x_trans=x_trans))
_svd(schema_madlib, x_trans, output_table_prefix, k, nIterations, True)
# Switch U and V
_transpose_UV(schema_madlib,
add_postfix(output_table_prefix, "_u"),
add_postfix(output_table_prefix, "_v"))
plpy.execute("DROP TABLE IF EXISTS {0}".format(x_trans))
else:
_svd(
schema_madlib,
source_table,
output_table_prefix,
k,
nIterations,
True)
if result_summary_table:
t1 = time.time()
src_unblock = "pg_temp." + unique_string()
plpy.execute("""
select {schema_madlib}.matrix_unblockize(
'{source_table}', 'row=row_id, col=col_id, val=block',
'{src_unblock}', 'row=row_id, col=col_id, val=row_vec')
""".format(source_table=source_table,
src_unblock=src_unblock,
schema_madlib=schema_madlib))
arguments = {'schema_madlib': schema_madlib,
'source_table': src_unblock,
'matrix_u': add_postfix(output_table_prefix, "_u"),
'matrix_v': add_postfix(output_table_prefix, "_v"),
'matrix_s': add_postfix(output_table_prefix, "_s"),
'row_dim': row_dim,
'col_dim': col_dim,
'result_summary_table': result_summary_table,
'temp_prefix': "pg_temp." + unique_string(),
't0': t0, 't1': t1}
create_summary_table(**arguments)
plpy.execute("set client_min_messages to " + old_msg_level)
# ------------------------------------------------------------------------
def svd_sparse(schema_madlib, source_table, output_table_prefix,
row_id, col_id, value, row_dim, col_dim, k, nIterations,
result_summary_table=None):
"""
Compute the Singular Value Decomposition of a sparse matrix
This function is the specific call for sparse matrices and creates three
tables corresponding to the three decomposition matrices.
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
(in sparse format)
@param output_table_prefix Prefix string for the output table names
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val Name of the value column
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param result_summary_table Name of the table to store summary of results
Returns:
None
"""
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
# Convert the sparse matrix into a dense matrix
if result_summary_table:
t0 = time.time() # measure the starting time
_validate_args(schema_madlib, source_table, output_table_prefix,
k, row_id, nIterations, col_id=col_id, val_id=value,
result_summary_table=result_summary_table)
# copy the input table to a temporary table since all matrix operation
# modules require the dimensionality of the sparse matrix in the table
# as a row: (row_dim, col_dim, NULL)
converted_tbl = "pg_temp." + unique_string()
create_temp_sparse_matrix_table_with_dims(source_table,
converted_tbl,
row_id, col_id, value,
row_dim, col_dim)
source_table = converted_tbl
x_dense = "pg_temp." + unique_string() + "_1"
plpy.execute("""
SELECT {schema_madlib}.matrix_densify(
'{source_table}', 'row={row_id}, col={col_id}, val={value}',
'{x_dense}', 'val=row_vec')
""".format(schema_madlib=schema_madlib,
source_table=source_table,
x_dense=x_dense,
row_id=row_id, col_id=col_id, value=value))
# Call SVD for the dense matrix
svd(schema_madlib, x_dense, output_table_prefix,
row_id, k, nIterations, result_summary_table)
if result_summary_table:
t1 = time.time()
dt = (t1 - t0) * 1000.
plpy.execute("""
UPDATE {result_summary_table} SET \"exec_time (ms)\" = {dt:.2f};
""".format(result_summary_table=result_summary_table, dt=dt))
plpy.execute("DROP TABLE if exists {x_dense}".format(x_dense=x_dense))
plpy.execute("DROP TABLE if exists {converted_tbl}"
.format(converted_tbl=converted_tbl))
plpy.execute("set client_min_messages to " + old_msg_level)
# ------------------------------------------------------------------------
def svd_sparse_native(schema_madlib, source_table, output_table_prefix,
row_id, col_id, val, row_dim, col_dim, k, nIterations,
result_summary_table=None):
"""
Compute the Singular Value Decomposition of a sparse matrix
This function is the specific call for sparse matrices and creates three
tables corresponding to the three decomposition matrices.
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
(in sparse format)
@param output_table_prefix Prefix string for the output table names
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val Name of the value column
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param result_summary_table Name of the table to store summary of results
Returns:
None
"""
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
_validate_args(schema_madlib, source_table, output_table_prefix,
k, row_id, nIterations, col_id, val,
result_summary_table, False)
converted_tbl = "pg_temp." + unique_string()
plpy.execute("""
CREATE TABLE {converted_tbl} as
SELECT
{row_id}::int4 as row_id,
{col_id}::int4 as col_id,
{value}::float8 as value
FROM {source_table}
WHERE {value} is not NULL
""".format(row_id=row_id, col_id=col_id, value=val,
source_table=source_table,
converted_tbl=converted_tbl))
source_table = converted_tbl
# NOTE: At this point source_table has hard-coded column names:
# 'row_id' --- 'col_id' --- 'value'
t0 = time.time() # measure the starting time
_svd(schema_madlib, source_table, output_table_prefix, k,
nIterations, False, 'row_id', 'col_id', 'value')
if col_dim > row_dim:
# Switch U and V
_transpose_UV(schema_madlib,
add_postfix(output_table_prefix, "_u"),
add_postfix(output_table_prefix, "_v"))
if result_summary_table:
t1 = time.time()
src_dense = "pg_temp." + unique_string()
plpy.execute("""
select {schema_madlib}.matrix_densify(
'{source_table}', 'row=row_id, col=col_id, val=value',
'{src_dense}', 'val=row_vec')
""".format(source_table=source_table, row_id=row_id, col_id=col_id,
val=val, src_dense=src_dense, schema_madlib=schema_madlib))
arguments = {'schema_madlib': schema_madlib,
'source_table': src_dense,
'matrix_u': add_postfix(output_table_prefix, "_u"),
'matrix_v': add_postfix(output_table_prefix, "_v"),
'matrix_s': add_postfix(output_table_prefix, "_s"),
'row_dim': row_dim,
'col_dim': col_dim,
'result_summary_table': result_summary_table,
'temp_prefix': "pg_temp." + unique_string(),
't0': t0,
't1': t1}
create_summary_table(**arguments)
plpy.execute("DROP TABLE if exists {0}".format(converted_tbl))
plpy.execute("set client_min_messages to " + old_msg_level)
# ------------------------------------------------------------------------
def _svd_upper_wrap(schema_madlib, source_table, output_table_prefix,
row_id, k, nIterations, result_summary_table):
"""
Compute the Singular Value Decomposition of a sparse matrix
This function is the wrapper for the upper half of svd function.
Returns a tuple (new_source_table,bd_pref). new_source_table is returned
to be used in the lower part in case the table is transposed. bd_pref is a
tuple containing two table names, created with unique strings and required
in the lower half of svd
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
@param output_table_prefix Prefix string for the output table names
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val Name of the value column
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param result_summary_table Name of the table to store summary of results
Returns:
(source_table,bd_pref)
"""
[row_dim, col_dim] = get_dims(source_table,
{'row': 'row_id', 'col': 'col_id', 'val': 'row_vec'})
validate_dense(source_table,
{'row': 'row_id', 'val': 'row_vec'},
check_col=False, row_dim=row_dim)
if k > min(row_dim, col_dim):
plpy.error("SVD error: k cannot be larger than min(row_dim, col_dim)!")
# _svd expects a narrow dense matrix(i.e. row_dim >= col_dim)
# If the input matrix is a wide dense matrix(i.e. row_dim < col_dim),
# We need to transpose it into a narrow one
if col_dim > row_dim:
x_trans = "pg_temp." + unique_string() + "_xt_3"
plpy.execute("""
SELECT {schema_madlib}.matrix_trans(
'{source_table}', 'row=row_id, val=row_vec',
'{x_trans}', 'row=row_id, val=row_vec')
""".format(schema_madlib=schema_madlib,
source_table=source_table, x_trans=x_trans))
bd_pref = _svd_upper(
schema_madlib,
x_trans,
output_table_prefix,
k,
nIterations,
is_block=False)
return (x_trans, bd_pref)
else:
bd_pref = _svd_upper(
schema_madlib,
source_table,
output_table_prefix,
k,
nIterations,
is_block=False)
return (source_table, bd_pref)
# ------------------------------------------------------------------------
def _svd_upper(schema_madlib, source_table, output_table_prefix,
k, nIterations, is_block=False,
row_id=None, col_id=None, val=None):
"""
Compute the Singular Value Decomposition of a sparse matrix
This function is the upper half of svd function.
Returns a tuple (bidiagonal_prefix,svd_bidiagonal). Both of these table
names are created with unique strings and required in the lower half of svd
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
@param output_table_prefix Prefix string for the output table names
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val Name of the value column
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param result_summary_table Name of the table to store summary of results
Returns:
(bidiagonal_prefix,svd_bidiagonal)
"""
nIterations_arg = (nIterations, "NULL")[nIterations is None]
# Bidiagonalize the input matrix using the Lanczos algorithm
bidiagonal_prefix = "pg_temp." + unique_string() + 'bidiag'
if row_id or col_id or val:
if not (row_id and col_id and val):
plpy.error("SVD error: All column names for sparse matrices"
" should be provided")
is_sparse = True
else:
is_sparse = False
if is_sparse:
plpy.execute("""
SELECT {schema_madlib}.__svd_lanczos_bidiagonalize_sparse(
'{source_table}',
'{row_id}', '{col_id}', '{val}',
'{bidiagonal_prefix}',
{nIterations}, {k})
""".format(schema_madlib=schema_madlib, source_table=source_table,
row_id=row_id, col_id=col_id, val=val, k=k,
bidiagonal_prefix=bidiagonal_prefix,
nIterations=nIterations_arg))
else:
plpy.execute("""
SELECT {schema_madlib}.__svd_lanczos_bidiagonalize(
'{source_table}',
'{bidiagonal_prefix}',
{nIterations}, {k}, {is_block})
""".format(schema_madlib=schema_madlib, source_table=source_table,
k=k, bidiagonal_prefix=bidiagonal_prefix,
nIterations=nIterations_arg, is_block=is_block))
# Compute SVD decomposition of the bidiagonal matrix
svd_bidiagonal = "pg_temp." + unique_string() + 'svd_output'
rv = plpy.execute("""
SELECT
{schema_madlib}.array_max(row_id::INTEGER[]) as row_dim,
{schema_madlib}.array_max(col_id::INTEGER[]) as col_dim
FROM
{bidiagonal_prefix}_b
""".format(schema_madlib=schema_madlib,
bidiagonal_prefix=bidiagonal_prefix))[0]
row_dim = rv['row_dim']
col_dim = rv['col_dim']
if row_dim != col_dim:
plpy.error('invalid input matrix: should be a suqare matrix')
plpy.execute("""
DROP TABLE IF EXISTS {svd_bidiagonal};
CREATE TABLE {svd_bidiagonal} as
SELECT {schema_madlib}.__svd_decompose_bidiag(
row_id::INTEGER[], col_id::INTEGER[],
value::FLOAT8[]) AS svd_output
FROM {bidiagonal_prefix}_b;
""".format(schema_madlib=schema_madlib,
svd_bidiagonal=svd_bidiagonal,
bidiagonal_prefix=bidiagonal_prefix))
non_zero_svals = plpy.execute("""
SELECT count(*) AS nz
FROM (
SELECT unnest((svd_output).singular_values) AS i
FROM {svd_bidiagonal}
) Q1
WHERE i > 0
""".format(svd_bidiagonal=svd_bidiagonal))[0]['nz']
if k > non_zero_svals:
plpy.warning("Value of 'k' is greater than the number of non-zero "
"singular values. Outputing only {nz} instead of {k} "
"eigen values/vectors".format(nz=non_zero_svals, k=k)
)
k = non_zero_svals
# Compute the singular values and output to sparse table
# matrix_s_tmp = add_postfix(output_table_prefix, "_stmp")
matrix_s = add_postfix(output_table_prefix, "_s")
plpy.execute(
"""
CREATE TABLE {matrix_s} AS
SELECT * FROM (
SELECT
generate_series(1,
array_upper((svd_output).singular_values, 2)) as row_id,
generate_series(1,
array_upper((svd_output).singular_values, 2)) as col_id,
unnest((svd_output).singular_values) as value
FROM {svd_bidiagonal}) s
ORDER BY row_id
LIMIT {k}
""".format(**locals()))
return (bidiagonal_prefix, svd_bidiagonal)
# ------------------------------------------------------------------------
def _svd_lower_wrap(schema_madlib, source_table, output_table_prefix,
row_id, k, nIterations, bd_pref, matrix_s=None):
"""
Compute the Singular Value Decomposition of a sparse matrix
This function is the wrapper for the lower half of svd function.
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
@param output_table_prefix Prefix string for the output table names
@param row_id Name of the row_id column
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param bd_pref Tuple of table names related to the bidiagonalization
Returns:
None
"""
_svd_lower(schema_madlib, source_table, output_table_prefix,
k, nIterations, bd_pref, False, None, None, None, matrix_s)
if "_xt_3" in source_table and "pg_temp." in source_table:
# Switch U and V
_transpose_UV(schema_madlib,
add_postfix(output_table_prefix, "_u"),
add_postfix(output_table_prefix, "_v"))
plpy.execute(
"""DROP TABLE IF EXISTS {source_table}""".format(
source_table=source_table
))
return None
# ------------------------------------------------------------------------
def _svd_lower(schema_madlib, source_table, output_table_prefix,
k, nIterations, bd_pref, is_block=False,
row_id=None, col_id=None, val=None, matrix_s=None):
"""
Compute the Singular Value Decomposition of a sparse matrix
This function is the lower half of svd function.
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
@param output_table_prefix Prefix string for the output table names
@param row_id Name of the row_id column
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param bd_pref Tuple of table names related to the bidiagonalization
Returns:
None
"""
# Compute the singular values and output to sparse table
if matrix_s is None:
matrix_s = add_postfix(output_table_prefix, "_s")
bidiagonal_prefix = bd_pref[0]
svd_bidiagonal = bd_pref[1]
plpy.execute("""
INSERT INTO {matrix_s}
SELECT {k}, {k}, NULL
FROM {svd_bidiagonal}
LIMIT 1
""".format(**locals()))
# Compute the left singular matrix and output to table
plpy.execute("""
SELECT
{schema_madlib}.__svd_vec_trans_mult_matrix(
'{bidiagonal_prefix}_p',
'{svd_bidiagonal}',
{k},
true,
'{matrix_u}'
)
""".format(schema_madlib=schema_madlib,
matrix_u=add_postfix(output_table_prefix, "_u"),
bidiagonal_prefix=bidiagonal_prefix,
svd_bidiagonal=svd_bidiagonal,
k=k))
# Compute the right singular matrix and output to table
plpy.execute("""
SELECT
{schema_madlib}.__svd_vec_trans_mult_matrix(
'{bidiagonal_prefix}_q',
'{svd_bidiagonal}',
{k},
false,
'{matrix_v}'
)
""".format(schema_madlib=schema_madlib,
matrix_v=add_postfix(output_table_prefix, "_v"),
bidiagonal_prefix=bidiagonal_prefix,
svd_bidiagonal=svd_bidiagonal,
k=k))
# plpy.execute("DROP TABLE if exists {matrix_s_tmp}"
# .format(matrix_s_tmp=matrix_s_tmp))
plpy.execute("DROP TABLE if exists {svd_bidiagonal}"
.format(svd_bidiagonal=svd_bidiagonal))
plpy.execute("""
DROP TABLE if exists {bidiagonal_prefix}_p;
DROP TABLE if exists {bidiagonal_prefix}_q;
DROP TABLE if exists {bidiagonal_prefix}_b;
""".format(bidiagonal_prefix=bidiagonal_prefix))
return None
# ------------------------------------------------------------------------
def _svd(schema_madlib, source_table, output_table_prefix,
k, nIterations, is_block=False,
row_id=None, col_id=None, val=None):
"""
Compute the SVD of a matrix (sparse or dense) into individual tables
The function computes SVD (USV') of a matrix using the Lanczos
Bidiagonalization method and stores the 'U' and 'V'
matrix as dense matrices and the 'S' matrix as a sparse matrix.
Args:
@param schema_madlib Schema where MADlib is installed
@param source_table Input table with the matrix to be decomposed
(in sparse or dense format)
@param output_table_prefix Prefix string for the output table names
@param k Number of eigen vectors to output
@param nIterations Number of iterations to run Lanczos algorithm
(Higher the number, greater the accuracy, and
greater the run time)
@param is_block Use block variant of matrix operations if True
@param row_id Name of the row_id column
@param col_id Name of the col_id column
@param val Name of the value column
"""
bd_pref = _svd_upper(schema_madlib, source_table, output_table_prefix, k,
nIterations, is_block, row_id, col_id, val)
_svd_lower(schema_madlib, source_table, output_table_prefix, k,
nIterations, bd_pref, is_block, row_id, col_id, val)
# ------------------------------------------------------------------------
def _lanczos_bidiagonalize_create_pq_table(
schema_madlib, pq_table_prefix, col_dim):
"""
Creates and initializes the P and Q (left and right) matrices output of
Lanczos bidiagonalization.
Lanczos bidiagonalization decomposes matrix as A = P B Q
These three matrices are created and initialized as tables.
Args:
@param schema_madlib Madlib schema
@param pq_table_prefix Prefix string for the table names
@param col_dim Number of columns in original (A) matrix
"""
# Create and initialize the P table
plpy.execute("""
CREATE TABLE {pq_table_prefix}_p (
id INT,
alpha FLOAT8,
pvec FLOAT8[]
)
""".format(pq_table_prefix=pq_table_prefix))
plpy.execute("""
INSERT INTO {pq_table_prefix}_p VALUES(0, 0, NULL);
""".format(pq_table_prefix=pq_table_prefix))
# Create and initialize the Q table
plpy.execute("""
CREATE TABLE {pq_table_prefix}_q (
id INT,
beta FLOAT8,
qvec FLOAT8[]
);
""".format(pq_table_prefix=pq_table_prefix))
plpy.execute("""
INSERT INTO {pq_table_prefix}_q
SELECT 1, 0, {schema_madlib}.__svd_unit_vector({col_dim})
""".format(schema_madlib=schema_madlib,
pq_table_prefix=pq_table_prefix, col_dim=col_dim))
# ------------------------------------------------------------------------
def _lanczos_bidiagonalize_output_results(schema_madlib, output_table_prefix,
pq_table_prefix, k):
"""
Outputs the P, Q, and B matrices.
Lanczos bidiagonalization decomposes matrix as A = P B Q
These three matrices are outputed as database tables.
Args:
@param output_table_prefix Prefix string for the output table names
@param pq_table_prefix Prefix string for the table names
@param k Number of eigen vectors to output
"""
# Output P vectors
plpy.execute("""
CREATE TABLE {output_table_prefix}_p (row_id INT, row_vec FLOAT8[])
""".format(output_table_prefix=output_table_prefix))
plpy.execute("""
INSERT INTO {output_table_prefix}_p
SELECT id, pvec
FROM {pq_table_prefix}_p
WHERE id > 0
""".format(output_table_prefix=output_table_prefix,
pq_table_prefix=pq_table_prefix))
# Output Q vectors
plpy.execute("""
CREATE TABLE {output_table_prefix}_q (row_id INT, row_vec FLOAT8[])
""".format(output_table_prefix=output_table_prefix))
plpy.execute("""
INSERT INTO {output_table_prefix}_q
SELECT id, qvec
FROM {pq_table_prefix}_q
WHERE id <= {k}
""".format(output_table_prefix=output_table_prefix,
pq_table_prefix=pq_table_prefix, k=k))
# Output B matrix
plpy.execute("""
DROP TABLE IF EXISTS {output_table_prefix}_b
""".format(output_table_prefix=output_table_prefix))
plpy.execute("""
CREATE TABLE {output_table_prefix}_b AS
SELECT
array_agg(row_id) AS row_id,
array_agg(col_id) AS col_id,
array_agg(val) AS value
FROM (
SELECT id::int4 AS row_id,
id::int4 AS col_id,
alpha AS val
FROM {pq_table_prefix}_p
WHERE id > 0
UNION
SELECT id - 1 AS row_id,
id AS col_id,
beta AS val
FROM {pq_table_prefix}_q
WHERE id > 1 AND id <= {k}
) t
""".format(output_table_prefix=output_table_prefix,
pq_table_prefix=pq_table_prefix, k=k))
# ------------------------------------------------------------------------
def lanczos_bidiagonalize(schema_madlib, source_table, output_table_prefix,
nIterations, k, is_block=False):
"""
Decomposes an input matrix as A = P B Q, where B is a bidiagonal matrix.
Args:
@param schema_madlib Madlib schema
@param source_table Input matrix stored in a table
@param output_table_prefix Prefix string for the output table names
@param nIterations Number of iterations to run the Lanczos algorithm
@param k Number of eigen vectors to output
@param is_block Should the block variant of matrix operations be used?
Note that source_table must be a dense matrix in the array format and the
row_dim should be no less than the col_dim. The column names of the source_table
are assumed to be (row_id, row_vec)
The value of k should be in the range of [1, col_dim]. When k < col_dim, it
will generate an approximated results.
"""
global actual_lanczos_iterations
# Get the dimensions of the matrix
val = 'block' if is_block else 'row_vec'
[row_dim, col_dim] = get_dims(source_table,
{'row': 'row_id',
'col': 'col_id',
'val': val},
is_block=is_block)
if row_dim < col_dim:
plpy.error("SVD error: row dimension ({0}) should be not less than"
"column dimension ({1})".format(row_dim, col_dim))
# Limit the size of matrix B (which will be kept in memory)
# See note at top about MAX_LANCZOS_STEPS
if nIterations is None:
nIterations = min(col_dim, MAX_LANCZOS_STEPS)
elif nIterations < k or nIterations > col_dim:
plpy.error("SVD error: n_Iterations should be"
" in the range of [{0}, {1}]".format(k, col_dim))
elif nIterations > MAX_LANCZOS_STEPS:
plpy.error("SVD error: n_Iterations"
" should be less than {0}".format(MAX_LANCZOS_STEPS))
actual_lanczos_iterations = nIterations
pq_table_prefix = "pg_temp." + unique_string() + "_8"
_lanczos_bidiagonalize_create_pq_table(
schema_madlib, pq_table_prefix, col_dim)
# Transform the matrix
x_trans = "pg_temp." + unique_string() + "_9"
matrix_trans_function = 'matrix_trans'
if is_block:
matrix_trans_function = 'matrix_block_trans'
plpy.execute("""
SELECT {schema_madlib}.{matrix_trans_function}(
'{source_table}', 'row=row_id, col=col_id, val={val}',
'{x_trans}', 'row=row_id, col=col_id, val={val}')
""".format(**locals()))
if is_block:
lanczos_sql = """
{schema_madlib}.__svd_block_lanczos_agg(
row_id::int4, col_id::int4, block, Q.qvec, {row_dim}) AS x_qvec
""".format(schema_madlib=schema_madlib, row_dim=row_dim)
else:
lanczos_sql = """
{schema_madlib}.__svd_lanczos_agg(
row_id::int4, row_vec, Q.qvec, {row_dim}) AS x_qvec
""".format(schema_madlib=schema_madlib, row_dim=row_dim)
if is_block:
lanczos_sql_trans = """
{schema_madlib}.__svd_block_lanczos_agg(
row_id::int4, col_id::int4, block, P.pvec, {col_dim}) AS x_trans_pvec
""".format(schema_madlib=schema_madlib, col_dim=col_dim)
else:
lanczos_sql_trans = """
{schema_madlib}.__svd_lanczos_agg(
row_id::int4, row_vec, P.pvec, {col_dim}) AS x_trans_pvec
""".format(schema_madlib=schema_madlib, col_dim=col_dim)
# Perform Lanczos iteration
for i in range(1, nIterations + 1):
plpy.execute("""
INSERT INTO {pq_table_prefix}_p
SELECT id, (f).*
FROM (
SELECT
{i} as id,
{schema_madlib}.__svd_lanczos_pvec(x_qvec, P.pvec, Q.beta) as f
FROM
( SELECT {lanczos_sql}
FROM {source_table},
{pq_table_prefix}_q AS Q
WHERE Q.id = {i}
) t1,
{pq_table_prefix}_p AS P,
{pq_table_prefix}_q AS Q
WHERE
P.id = {i} - 1 AND Q.id = {i}
) s
""".format(lanczos_sql=lanczos_sql, schema_madlib=schema_madlib,
source_table=source_table, pq_table_prefix=pq_table_prefix,
row_dim=row_dim, i=i))
# Orthogonalize the current set of Q vectors
plpy.execute("""
INSERT INTO {pq_table_prefix}_q
SELECT id, (f).*
FROM (
SELECT
{i} + 1 as id,
{schema_madlib}.__svd_gram_schmidt_orthogonalize_agg(
qvec_new, Q.qvec) as f
FROM
( SELECT {schema_madlib}.__svd_lanczos_qvec(
x_trans_pvec, Q.qvec, P.alpha) AS qvec_new
FROM (
SELECT {lanczos_sql_trans}
FROM {x_trans}, {pq_table_prefix}_p AS P
WHERE P.id = {i}
) t1,
{pq_table_prefix}_p AS P,
{pq_table_prefix}_q AS Q
WHERE P.id = {i} AND Q.id = {i}
) t1, {pq_table_prefix}_q AS Q
) s
""".format(lanczos_sql_trans=lanczos_sql_trans,
schema_madlib=schema_madlib, x_trans=x_trans,
pq_table_prefix=pq_table_prefix, col_dim=col_dim, i=i))
# Output results as database tables
_lanczos_bidiagonalize_output_results(schema_madlib, output_table_prefix,
pq_table_prefix, nIterations)
plpy.execute("""
DROP TABLE IF EXISTS {pq_table_prefix}_p;
DROP TABLE IF EXISTS {pq_table_prefix}_q;
DROP TABLE IF EXISTS {x_trans}
""".format(pq_table_prefix=pq_table_prefix, x_trans=x_trans))
# ------------------------------------------------------------------------
def lanczos_bidiagonalize_sparse(schema_madlib, source_table,
row_id, col_id, val,
output_table_prefix, nIterations, k):
""" Compute SVD of sparse matrix using Lanczos bidiagonalization algorithm """
global actual_lanczos_iterations
# Get the dimensions of the matrix
[row_dim, col_dim] = get_dims(source_table,
{'row': row_id, 'col': col_id, 'val': val})
inplace_trans = False
if row_dim < col_dim:
tmp = row_dim
row_dim = col_dim
col_dim = tmp
inplace_trans = True
# Limit the size of matrix B (which will be kept in memory)
# See note at top about MAX_LANCZOS_STEPS
if nIterations is None:
nIterations = min(col_dim, MAX_LANCZOS_STEPS)
elif nIterations < k or nIterations > col_dim:
plpy.error("SVD error: n_Iterations should be in the"
"range of [{k}, {col_dim}]".format(k=k, col_dim=col_dim))
elif nIterations > MAX_LANCZOS_STEPS:
plpy.error("SVD error: n_Iterations"
" should be less than {0}".format(MAX_LANCZOS_STEPS))
actual_lanczos_iterations = nIterations
pq_table_prefix = "pg_temp." + unique_string() + "_8"
_lanczos_bidiagonalize_create_pq_table(
schema_madlib, pq_table_prefix, col_dim)
lanczos_sql = """
{schema_madlib}.__svd_sparse_lanczos_agg(
{row_id}::int4, {col_id}::int4, {val}::float8, Q.qvec, {row_dim}) AS x_qvec
""".format(schema_madlib=schema_madlib,
row_dim=row_dim,
row_id=col_id if inplace_trans else row_id,
col_id=row_id if inplace_trans else col_id,
val=val)
lanczos_sql_trans = """
{schema_madlib}.__svd_sparse_lanczos_agg(
{row_id}::int4, {col_id}::int4, {val}::float8, P.pvec, {col_dim}::int4) AS x_trans_pvec
""".format(schema_madlib=schema_madlib,
col_dim=col_dim,
row_id=row_id if inplace_trans else col_id,
col_id=col_id if inplace_trans else row_id,
val=val)
# Perform Lanczos iteration
for i in range(1, nIterations + 1):
plpy.execute("""
INSERT INTO {pq_table_prefix}_p
SELECT id, (f).*
FROM (
SELECT
{i} as id,
{schema_madlib}.__svd_lanczos_pvec(x_qvec, P.pvec, Q.beta) as f
FROM (
SELECT {lanczos_sql}
FROM {source_table},
{pq_table_prefix}_q AS Q
WHERE Q.id = {i} AND {val} IS NOT NULL
) t1,
{pq_table_prefix}_p AS P,
{pq_table_prefix}_q AS Q
WHERE P.id = {i} - 1 AND Q.id = {i}
) s
""".format(lanczos_sql=lanczos_sql, schema_madlib=schema_madlib,
source_table=source_table, row_dim=row_dim, i=i, val=val,
pq_table_prefix=pq_table_prefix))
# Orthogonalize the current set of Q vectors
plpy.execute("""
INSERT INTO {pq_table_prefix}_q
SELECT id, (f).*
FROM (
SELECT
{i} + 1 as id,
{schema_madlib}.__svd_gram_schmidt_orthogonalize_agg(
qvec_new, Q.qvec) as f
FROM (
SELECT {schema_madlib}.__svd_lanczos_qvec(
x_trans_pvec, Q.qvec, P.alpha) AS qvec_new
FROM (
SELECT {lanczos_sql_trans}
FROM {source_table}, {pq_table_prefix}_p AS P
WHERE P.id = {i} AND {val} IS NOT NULL
) t1,
{pq_table_prefix}_p AS P,
{pq_table_prefix}_q AS Q
WHERE P.id = {i} AND Q.id = {i}
) t1,
{pq_table_prefix}_q AS Q
) s
""".format(lanczos_sql_trans=lanczos_sql_trans, val=val,
schema_madlib=schema_madlib, source_table=source_table,
pq_table_prefix=pq_table_prefix, col_dim=col_dim, i=i))
# Output results as database tables
_lanczos_bidiagonalize_output_results(schema_madlib, output_table_prefix,
pq_table_prefix, nIterations)
plpy.execute("""
DROP TABLE IF EXISTS {pq_table_prefix}_p;
DROP TABLE IF EXISTS {pq_table_prefix}_q;
""".format(pq_table_prefix=pq_table_prefix))
# -------------------------------------------------------------------------
def svd_help_message(schema_madlib, message, **kwargs):
"""
Given a help string, provide usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
There are multiple ways to use the SVD module:
Dense matrices:
SELECT {schema_madlib}.svd(
source_table, -- TEXT, Source table name (dense matrix)
output_table_prefix, -- TEXT, Prefix for output tables
row_id, -- TEXT, ID for each row
k, -- INTEGER, Number of singular vectors to compute
nIterations, -- INTEGER, Number of iterations to run (OPTIONAL)
result_summary_table, -- TEXT Table name to store result summary (OPTIONAL)
);
Sparse matrices:
SELECT {schema_madlib}.svd_sparse(
source_table, -- TEXT, Source table name (sparse matrix)
output_table_prefix, -- TEXT, Prefix for output tables
row_id, -- TEXT, ID for each row
col_id, -- TEXT, ID for each column
value, -- TEXT, Non-zero values of the sparse matrix
row_dim, -- INTEGER, Row dimension of sparse matrix
col_dim, -- INTEGER, Col dimension of sparse matrix
k, -- INTEGER, Number of singular vectors to compute
nIterations, -- INTEGER, Number of iterations to run (OPTIONAL)
result_summary_table -- TEXT Table name to store result summary (OPTIONAL)
);
Block matrices:
SELECT {schema_madlib}.svd_block(
source_table, -- TEXT, Source table name (block matrix)
output_table_prefix, -- TEXT, Prefix for output tables
k, -- INTEGER, Number of singular vectors to compute
nIterations, -- INTEGER, Number of iterations to run (OPTIONAL)
result_summary_table -- TEXT Table name to store result summary (OPTIONAL)
);
Native implementation for sparse matrix:
SELECT {schema_madlib}.svd_sparse_native(
source_table, -- TEXT, Source table name (sparse matrix)
output_table_prefix, -- TEXT, Prefix for output tables
row_id, -- TEXT, ID for each row
col_id, -- TEXT, ID for each column
value, -- TEXT, Non-zero values of the sparse matrix
row_dim, -- INTEGER, Row dimension of sparse matrix
col_dim, -- INTEGER, Col dimension of sparse matrix
k, -- INTEGER, Number of singular vectors to compute
lanczos_iter, -- INTEGER, Number of iterations to run, optional
result_summary_table -- TEXT Table name to store result summary, optional
);
-------------------------------------------------------------------------
OUTPUT TABLE
-------------------------------------------------------------------------
Ouptut for eigen vectors/values will be in the following 3 tables:
- Left singular matrix: Table named <output_table_prefix>_left (e.g. 'netflix_u')
- Right singular matrix: Table named <output_table_prefix>_right (e.g. 'netflix_v')
- Singular values: Table named <output_table_prefix>_s (e.g. 'netflix_s')
The singular vector tables are of the format:
row_id INTEGER, -- ID corresponding to each eigen value (in decreasing order)
row_vec FLOAT8[] -- Singular vector elements for this row_id
Each array is of size k
The singular values table is in a sparse table format:
row_id INTEGER, -- i for ith eigen value
col_id INTEGER, -- i for ith eigen value (same as row_id)
value FLOAT8 -- Eigen Value
-------------------------------------------------------------------------
RESULT SUMMARY TABLE
-------------------------------------------------------------------------
Result summary table will be in the following format:
rows_used INTEGER, -- Number of rows used for SVD calculation
exec_time (ms) FLOAT8, -- Total time for executing SVD
iter INTEGER, -- Total number of iterations run
recon_error FLOAT8 -- Total quality score (i.e. approximation quality) for this set of
orthonormal basis
""".format(schema_madlib=schema_madlib)
else:
return """
In linear algebra, the singular value decomposition (SVD) is a
factorization of a real or complex matrix, with many useful
applications in signal processing and statistics.
-------
For an overview on usage, run:
SELECT {schema_madlib}.svd('usage');
""".format(schema_madlib=schema_madlib)
# -------------------------------------------------------------------------
def svd_vec_trans_mult_matrix(schema_madlib, vec_table, mat_table,
k, res_table, is_left):
""" Compute the product of multiple vectors with a matrix stored in a table """
matrix = ('right_matrix', 'left_matrix')[is_left]
array_sum = m4_ifdef(`__POSTGRESQL__', `schema_madlib + ".__array_sum"', `"sum"')
plpy.execute("""
CREATE TABLE {res_table} AS
SELECT
row_id::INT4,
{array_sum}(row_vec) AS row_vec
FROM (
SELECT (f).row_id + 1 as row_id, (f).row_vec AS row_vec
FROM (
SELECT {schema_madlib}.__svd_vec_trans_mult_matrix_internal(
row_vec::FLOAT8[],
(SELECT (svd_output).{matrix} FROM {mat_table}),
row_id::INT4,
{k}
) AS f
FROM {vec_table}
) t1
) t2
GROUP BY row_id
""".format(**locals()))
# -------------------------------------------------------------------------
def _transpose_UV(schema_madlib, u_table_name, v_table_name):
"""
Swap the table names for the U and V tables. This effectively swaps U and V
and is useful when the SVD of the transpose of a matrix is required
Args:
@param schema_madlib:
@param u_table_name:
@param v_table_name:
Returns:
None
"""
tmp_name = unique_string()
tmp_name = rename_table(schema_madlib, u_table_name, tmp_name)
# we reset the tmp_name since it's possible that u_table_name was schema_qualified,
# in which case tmp_name will be set to that schema.
rename_table(schema_madlib, v_table_name, u_table_name)
rename_table(schema_madlib, tmp_name, v_table_name)
return None