blob: 4fc9a1584cd199695cc43757f04cc92f81de5911 [file] [log] [blame]
"""@file matrix_ops.py_in
@namespace linalg
"""
import plpy
import sys
from random import randint
from utilities.utilities import __mad_version
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.validate_args import get_cols
from utilities.validate_args import is_col_array
from utilities.validate_args import table_exists
from utilities.validate_args import table_is_empty
from utilities.validate_args import columns_exist_in_table
version_wrapper = __mad_version()
string_to_array = version_wrapper.select_vecfunc()
array_to_string = version_wrapper.select_vec_return()
# ------------------------------------------------------------------------------
# -- Utility functions ---------------------------------------------------------
# ------------------------------------------------------------------------------
def _matrix_column_to_array_format(source_table, row_id, output_table,
istemp=False):
"""
Convert a dense matrix in the column format into the array format
"""
_validate_output_table(output_table)
_validate_input_table(source_table)
row = plpy.execute("""
SELECT nspname AS table_schema, relname AS table_name
FROM pg_class AS c, pg_namespace AS nsp
WHERE c.oid = '{source_table}'::regclass::oid AND
c.relnamespace = nsp.oid
""".format(source_table=source_table))
table_schema = row[0]['table_schema']
table_name = row[0]['table_name']
numeric_types = set(['smallint', 'integer', 'bigint',
'real', 'numeric', 'double precision'])
all_columns = plpy.execute("""
SELECT quote_ident(column_name) as column_name, data_type
FROM
information_schema.columns
WHERE
table_schema = '{table_schema}' AND
table_name = '{table_name}'
ORDER BY ordinal_position
""".format(table_schema=table_schema,
table_name=table_name))
all_col_names = [column['column_name'] for column in all_columns]
numeric_col_names = set(column['column_name'] for column in all_columns
if column['data_type'] in numeric_types)
_assert(row_id in all_col_names, 'No row_id in the input table')
all_col_names.remove(row_id)
_assert(numeric_col_names.issuperset(set(all_col_names)),
"Not all columns are numeric!")
plpy.execute("""
CREATE {temp_str} TABLE {output_table}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{row_id} as row_id,
array[{val_col_names}]::double precision[] AS row_vec
FROM
{source_table}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY (row_id)')
""".format(output_table=output_table, row_id=row_id,
source_table=source_table,
temp_str=("", "TEMP")[istemp],
val_col_names=','.join(all_col_names)))
def create_temp_sparse_matrix_table_with_dims(source_table,
out_table,
row_id, col_id, value,
row_dim, col_dim,
sparse_where_condition=None):
"""
Make a copy of the input sparse table and add (row_dim, col_dim, NULL) to it
This function is the specific call for sparse matrices and creates a temp
table which is a copy of the source table. After creation, the tuples
(row_dim, col_dim, NULL) is added to (row_id, col_id, value)
Args:
@param source_table Source table (sparse matrix)
@param output_table Output table (sparse matrix)
@param row_dim Row dimensions
@param col_dim Column dimensions
Returns:
None
"""
if not sparse_where_condition:
sparse_where_condition = ''
plpy.execute("""
CREATE TABLE {out_table} as
SELECT
{row_id},
{col_id},
{value}
FROM {source_table}
WHERE {value} is not NULL
{sparse_where_condition}
""".format(row_id=row_id,
col_id=col_id,
value=value,
source_table=source_table,
out_table=out_table,
sparse_where_condition=sparse_where_condition))
res_row_dim, res_col_dim = get_dims(out_table, {'row': row_id,
'col': col_id,
'val': value})
if res_row_dim != row_dim or res_col_dim != col_dim:
plpy.execute("""
INSERT INTO {out_table} VALUES ({row_dim}, {col_dim}, 0)
""".format(**locals()))
def _is_sparse(matrix_in, val):
return not is_col_array(matrix_in, val)
def _matrix_default_args():
return {
'row': 'row_num',
'col': 'col_num',
'val': 'val',
'trans': False
}
def parse_matrix_args(matrix_args, in_default_args=None):
""" Parse name-value pair string for a matrix
Args:
@param matrix_args: Can be either dict or string.
If dict, then it's returned as is
If str, then it's expected to contain key-value pairs
in format "key=value"
Supported keys:
row = Name of the column containing row id
col = Name of the column containing column id
val = Name of the column containing values
trans = Boolean indicating if a matrix should be transposed before operation
fmt = Format of the output table
Returns:
Dictionary: Parses the matrix_args and creates a dictionary containing
all the {names: value}
"""
default_args = _matrix_default_args()
if in_default_args:
default_args.update(in_default_args)
if not matrix_args:
return default_args
if isinstance(matrix_args, dict):
return matrix_args
if not isinstance(matrix_args, str):
raise ValueError("Matrix error: invalid input for matrix args")
params_types = {'row': str, 'col': str, 'val': str, 'trans': bool, 'fmt': str}
matrix_params = extract_keyvalue_params(matrix_args,
params_types,
default_args)
if 'fmt' in matrix_params and matrix_params['fmt']:
matrix_params['fmt'] = matrix_params['fmt'].lower()
_assert(matrix_params['fmt'] in ('sparse', 'dense'),
"Matrix error: invalid input for matrix args 'fmt', it should be 'dense'"
" or 'sparse' but provided input is '{0}'".format(matrix_params['fmt']))
return matrix_params
def cast_dense_input_table_to_correct_columns(schema_madlib, matrix_in,
matrix_out, row_id):
# If the source table is already formatted as needed, do nothing,
# Otherwise, create a new temp table containing the data in the needed format (if possible).
# Returns true if a new table was generated. Returns false otherwise.
_validate_output_table(matrix_out)
_validate_input_table(matrix_in)
cols = get_cols(matrix_in, schema_madlib)
createTable = False
if len(cols) == 2:
cols.remove(row_id)
if not is_col_array(matrix_in, cols[0]):
plpy.error("SVD error: Data column should be of type array!")
if cols[0] != "row_vec" or row_id != "row_id":
plpy.execute(
"""
CREATE TABLE {matrix_out} as
SELECT {row_id} as row_id, {vec} as row_vec
FROM {matrix_in}
""".format(matrix_out=matrix_out,
row_id=row_id, vec=cols[0], matrix_in=matrix_in))
createTable = True
else:
plpy.execute(
"""
SELECT {schema_madlib}.__matrix_column_to_array_format (
'{matrix_in}', '{row_id}', '{matrix_out}', False)
""".format(schema_madlib=schema_madlib, matrix_in=matrix_in,
row_id=row_id,
matrix_out=matrix_out))
createTable = True
return createTable
def matrix_ndims(schema_madlib, matrix_in, in_args, is_block):
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
if is_block:
_validate_block(matrix_in, in_args)
elif _is_sparse(matrix_in, in_args['val']):
validate_sparse(matrix_in, in_args)
else:
validate_dense(matrix_in, in_args)
return get_dims(matrix_in, in_args, is_block)
def get_dims(matrix, matrix_args, is_block=False):
if is_block:
block_dim = plpy.execute("""
SELECT
max({matrix_args[row]}) as max_row_id,
max({matrix_args[col]}) as max_col_id
FROM {matrix}
""".format(matrix=matrix, matrix_args=matrix_args))[0]
max_block_size = plpy.execute("""
SELECT
(array_upper({matrix_args[val]}, 1) -
array_lower({matrix_args[val]}, 1) + 1) AS block_row_dim,
(array_upper({matrix_args[val]}, 2) -
array_lower({matrix_args[val]}, 2) + 1) AS block_col_dim
FROM {matrix}
WHERE {matrix_args[row]} = 1 AND {matrix_args[col]} = 1
""".format(matrix=matrix, matrix_args=matrix_args))[0]
min_block_size = plpy.execute("""
SELECT
(array_upper({matrix_args[val]}, 1) -
array_lower({matrix_args[val]}, 1) + 1) AS block_row_dim,
(array_upper({matrix_args[val]}, 2) -
array_lower({matrix_args[val]}, 2) + 1) AS block_col_dim
FROM {matrix}
WHERE {matrix_args[row]} = {max_row_id} and
{matrix_args[col]} = {max_col_id}
""".format(matrix=matrix, matrix_args=matrix_args,
max_row_id=block_dim['max_row_id'],
max_col_id=block_dim['max_col_id']))[0]
# all blocks will be the maximum size except the last one
row_dim = (block_dim['max_row_id'] - 1) * max_block_size['block_row_dim'] + min_block_size['block_row_dim']
col_dim = (block_dim['max_col_id'] - 1) * max_block_size['block_col_dim'] + min_block_size['block_col_dim']
elif _is_sparse(matrix, matrix_args['val']):
cols = [matrix_args[i] for i in ('row', 'col', 'val')]
_assert(columns_exist_in_table(matrix, cols),
"Matrix error: At least one of the columns ({1}) missing "
"from matrix {0}".format(matrix, ','.join(cols)))
rv = plpy.execute("""
SELECT
max({matrix_args[row]}::BIGINT) AS row_dim,
max({matrix_args[col]}::BIGINT) AS col_dim
FROM {matrix}
WHERE {matrix_args[val]} IS NOT NULL
""".format(matrix=matrix, matrix_args=matrix_args))
row_dim = rv[0]['row_dim'] if rv[0]['row_dim'] is not None else 0
col_dim = rv[0]['col_dim'] if rv[0]['col_dim'] is not None else 0
else:
cols = [matrix_args[i] for i in ('row', 'val')]
_assert(columns_exist_in_table(matrix, cols),
"Matrix error: At least one of the columns ({1}) missing "
"from matrix {0}".format(matrix, ','.join(cols)))
rv = plpy.execute("""
SELECT max({matrix_args[row]}::BIGINT) AS row_dim
FROM {matrix}
""".format(matrix=matrix, matrix_args=matrix_args))
row_dim = rv[0]['row_dim'] if rv[0]['row_dim'] is not None else 0
if row_dim != 0:
rv = plpy.execute("""
SELECT
DISTINCT(array_upper({matrix_args[val]}::float8[], 1) -
array_lower({matrix_args[val]}::float8[], 1) + 1)
AS col_dim
FROM
{matrix}
""".format(matrix=matrix, matrix_args=matrix_args))
_assert(rv and len(rv) == 1,
"dimensions mismatch: row_array.size() != vec.size(). "
"Data contains different sized arrays")
col_dim = rv[0]['col_dim']
else:
col_dim = 0
return (row_dim, col_dim)
# ------------------------------------------------------------------------------
# -- Validation functions ------------------------------------------------------
# ------------------------------------------------------------------------------
def _validate_input_array(diag_elements):
_assert(diag_elements is not None,
"Matrix error: Invalid diag elements")
_assert(len(diag_elements) != 0,
"Matrix error: Empty diag elements")
def _validate_output_table(table):
_assert(table is not None and table.replace('"', '').strip() != '',
"Matrix error: Invalid output table ({0})".format(table))
_assert(not table_exists(table),
"Matrix error: The output table ({0}) already exists".format(table))
def _validate_input_table(table):
_assert(table is not None and table.replace('"', '').strip() != '',
"Matrix error: Invalid input table ({0})".format(table))
_assert(table_exists(table),
"Matrix error: The input table ({0}) doesn't exist".format(table))
def validate_matrix(matrix, matrix_args, check_col=True):
if _is_sparse(matrix, matrix_args['val']):
validate_sparse(matrix, matrix_args, check_col=check_col)
else:
validate_dense(matrix, matrix_args, check_col=check_col)
def validate_sparse(matrix, matrix_args, check_col=True):
_assert(table_exists(matrix),
"Matrix error: Could not find input table " + matrix)
_assert(not table_is_empty(matrix),
"Matrix error: The input table ({0}) is empty".format(matrix))
if check_col:
_assert(columns_exist_in_table(matrix,
[matrix_args[i] for i in ('row', 'col', 'val')]),
"Matrix error: Missing columns from matrix {0}".format(matrix))
_assert(not is_col_array(matrix, matrix_args['val']),
"Matrix error: invalid column ({0}) should not be an array "
"datatype".format(matrix_args['val']))
# verify that row and col entries are in the right range
row_dim, col_dim = get_dims(matrix, matrix_args)
invalid_rows = plpy.execute("""
-- Check if only row indices in [1, row_dim] and
-- col_indices in [1, col_dim] are present
SELECT {matrix_args[row]} as r, {matrix_args[col]} as c
FROM
{matrix}
WHERE
{matrix_args[row]} < 1 OR
{matrix_args[col]} < 1 OR
{matrix_args[row]} > {row_dim} OR
{matrix_args[col]} > {col_dim}
ORDER BY r, c;
""".format(**locals()))
if invalid_rows:
plpy.error("Matrix error: Invalid index in matrix ({0}). "
"All row and column indices must be in the range [1, DIM]".format(matrix))
# verify that the matrix row_id, col_id elements are unique
c = unique_string()
non_unique_rows = plpy.execute("""
SELECT {matrix_args[row]} as r, {matrix_args[col]} as c
FROM {matrix}
WHERE {matrix_args[val]} IS NOT NULL
GROUP BY {matrix_args[row]}, {matrix_args[col]}
HAVING count(*) > 1
ORDER BY r, c
""".format(**locals()))
if non_unique_rows:
MAX_ROWS = 20
bad_rows = [str(i['r']) + " | " + str(i['c']) for i in non_unique_rows[:MAX_ROWS]]
plpy.error(
"""Matrix error: Following entries are duplicated in sparse matrix ({m}).
(Displaying a maximum of {M_R} entries)
{row} | {col}
-------+---------------------------
{all_rows}
Error: The above entries (only {M_R} shown) were duplicated in sparse matrix ({m})
""".format(m=matrix,
row=matrix_args['row'],
col=matrix_args['col'],
all_rows="\n".join(bad_rows),
M_R=MAX_ROWS))
def validate_dense(matrix, matrix_args, check_col=True, row_dim=None):
_assert(table_exists(matrix), "Matrix error: Could not find table {0}".format(matrix))
if check_col:
cols = [matrix_args[i] for i in ('row', 'val')]
_assert(columns_exist_in_table(
matrix, cols),
"Matrix error: Missing columns (one of {0}) "
"from matrix {1}".format(','.join(cols), matrix))
_assert(is_col_array(matrix, matrix_args['val']),
"Matrix error: invalid column ({0}) should be an array "
"datatype".format(matrix_args['val']))
# verify that the matrix row_id, col_id elements are unique and not missing
if row_dim is None:
row_dim = plpy.execute("SELECT max({0}::bigint) as max_row_dim FROM {1}".
format(matrix_args['row'], matrix))[0]['max_row_dim']
c = unique_string()
non_unique_rows = plpy.execute("""
-- Check duplicates
SELECT {matrix_args[row]}::BIGINT as r
FROM {matrix}
GROUP BY {matrix_args[row]}
HAVING (count(*) > 1)
UNION
-- Check if all (and only) indices in [1, row_dim] are present
SELECT coalesce({c}::BIGINT, {matrix_args[row]}::BIGINT) as r
FROM (
SELECT {matrix_args[row]}::BIGINT, {c}
FROM
{matrix}
FULL OUTER JOIN
generate_series(1, {row_dim}) as {c}
ON ({matrix_args[row]}::BIGINT = {c}::bigint)
) q
WHERE
{matrix_args[row]} is NULL OR
{c} is NULL
ORDER BY r;
""".format(**locals()))
if non_unique_rows:
MAX_ROWS = 20
bad_rows = [str(i['r']) for i in non_unique_rows[:MAX_ROWS]]
plpy.error(
"""Matrix error: Following {row} values are invalid in dense matrix ({m}).
(Displaying a maximum of {M_R} entries)
{row}
------------
{all_rows}
Error: Above {row} values are invalid in dense matrix ({m}).
""".format(m=matrix,
row=matrix_args['row'],
all_rows="\n".join(bad_rows),
M_R=MAX_ROWS))
def _validate_block(matrix, matrix_args):
_assert(table_exists(matrix), "The input table {0} doesn't exist".format(matrix))
_assert(columns_exist_in_table(matrix,
[matrix_args['row'],
matrix_args['col'],
matrix_args['val']]),
"Matrix error: One or more input columns ({0})"
" don't exist in the table ({1}) representing a block matrix".
format(','.join([matrix_args['row'], matrix_args['col'],
matrix_args['val']]),
matrix))
_assert(is_col_array(matrix, matrix_args['val']),
"Matrix error: Invalid block column - array expected")
_assert(not table_is_empty(matrix),
"Matrix error: Input table {0} is empty".format(matrix))
# ------------------------------------------------------------------------------
# -- Transformation operations -------------------------------------------------
# ------------------------------------------------------------------------------
def matrix_sparsify(schema_madlib, matrix_in, in_args, matrix_out, out_args):
_ = schema_madlib # unused parameter - retained for consistent API
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
(row_dim, col_dim) = get_dims(matrix_in, in_args)
validate_dense(matrix_in, in_args, row_dim=row_dim)
default_args = {'row': in_args['row'], 'val': in_args['val']}
if 'col' in in_args:
default_args['col'] = in_args['col']
out_args = parse_matrix_args(out_args, in_default_args=default_args)
temp_col_id, temp_val = unique_string(), unique_string()
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{in_args[row]} as {out_args[row]},
{temp_col_id} as {out_args[col]},
{temp_val} as {out_args[val]}
FROM
(
SELECT
{in_args[row]},
unnest({in_args[val]}) AS {temp_val},
generate_series(1, {col_dim}) AS {temp_col_id}
FROM
{matrix_in}
) t1
WHERE
{temp_val} <> 0
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
res_row_dim, res_col_dim = get_dims(matrix_out, out_args)
if res_row_dim != row_dim or res_col_dim != col_dim:
plpy.execute("""
INSERT INTO {matrix_out} VALUES ({row_dim}, {col_dim}, 0)
""".format(matrix_out=matrix_out, row_dim=row_dim, col_dim=col_dim))
def matrix_densify(schema_madlib, matrix_in, in_args, matrix_out, out_args):
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
validate_sparse(matrix_in, in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'val': in_args['val']})
(row_dim, col_dim) = get_dims(matrix_in, in_args)
_assert(col_dim < sys.maxint,
"Matrix error: Matrix {0} has too many rows. This cannot be "
"transposed in a dense format due to "
"restrictions on maximum array size.".format(matrix_in))
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{in_args[row]} AS {out_args[row]},
{schema_madlib}.__matrix_densify_agg(
{col_dim}::integer,
{in_args[col]}::integer,
{in_args[val]}) AS {out_args[val]}
FROM
{matrix_in}
WHERE
{in_args[val]} IS NOT NULL
GROUP BY
{in_args[row]}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
# Add vector with all zeros for rows not present in the sparse matrix
plpy.execute("""
INSERT INTO {matrix_out}
SELECT row as {out_args[row]}, val as {out_args[val]}
FROM
(
SELECT row
FROM generate_series(1, {row_dim}) AS row
WHERE row NOT IN
(
SELECT {in_args[row]} as row
FROM {matrix_in}
GROUP BY {in_args[row]}
)
) t1,
(
SELECT array_agg(val * 0) AS val
FROM generate_series(1, {col_dim}) AS val
) t2
""".format(**locals()))
# ------------------------------------------------------------------------------
# -- Element-wise operations ---------------------------------------------------
# ------------------------------------------------------------------------------
def matrix_elem_op(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
elem_ops):
""" Perform element-wise operation on two matrices.
Args:
@param schema_madlib: str, Name of the schema containing madlib functions
@param matrix_a: str, Name of the table containing 1st (left) matrix
@param a_args: str, Name-value pair string containing options for matrix_a
@param matrix_b: str, Name of the table containing 2nd (right) matrix
@param b_args: str, Name-value pair string containing options for matrix_b
@param matrix_out: str, Name of the table to store result matrix
@param out_args: str, Name-value pair string containing options for matrix_out
@param elem_ops: dict, Dictionary containing the operation to perform.
Requires two parameters:
scalar_op: The operation between each pair of
elements (Used for sparse matrices)
vector_op: The operation between each pair of
rows (Used for dense matrices)
Returns:
None
Side effect:
Creates an output table containing the result matrix
"""
_validate_input_table(matrix_a)
_validate_input_table(matrix_b)
_validate_output_table(matrix_out)
a_args = parse_matrix_args(a_args)
b_args = parse_matrix_args(b_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': a_args['row'],
'col': a_args['col'],
'val': a_args['val']})
a_dim = get_dims(matrix_a, a_args)
b_dim = get_dims(matrix_b, b_args)
_assert(a_dim[0] == b_dim[0] and a_dim[1] == b_dim[1],
"Matrix error: The dimensions of the two matrices don't match")
fmt = out_args['fmt'] if 'fmt' in out_args else None
is_a_sparse = _is_sparse(matrix_a, a_args['val'])
is_b_sparse = _is_sparse(matrix_b, b_args['val'])
transform_a_func, transform_b_func = None, None
if is_a_sparse == is_b_sparse:
# if both input formats are same, then directly apply operation ...
elem_op_func = _matrix_elem_op_sparse if is_a_sparse else _matrix_elem_op_dense
# ... and transform result if fmt is provided and not same format as input
to_transform_rst = fmt and ((fmt == "sparse") != is_a_sparse)
# transform_rst_func matters only if to_transform_rst == True
transform_rst_func = matrix_densify if is_a_sparse else matrix_sparsify
else:
# Different format inputs - change the format for one of the matrices.
# if output is sparse (default), change dense input to sparse
# if output is dense, change sparse input to dense
# In this case we don't need to transform output since it is directly
# produced in necessary format
to_transform_rst = False
is_output_sparse = not fmt or fmt == "sparse"
elem_op_func = _matrix_elem_op_sparse if is_output_sparse else _matrix_elem_op_dense
if is_output_sparse != is_a_sparse:
# output and matrix_a are opposite formats ...
transform_a_func = matrix_densify if is_a_sparse else matrix_sparsify
else:
# ... or if a is in the right format, then b is in the wrong format
transform_b_func = matrix_densify if is_b_sparse else matrix_sparsify
matrix_out1 = "pg_temp." + unique_string() + "_a"
if not transform_a_func and not transform_b_func:
validate_matrix(matrix_a, a_args)
validate_matrix(matrix_b, b_args)
elif transform_a_func:
# transform function internally validates
transform_a_func(schema_madlib, matrix_a, a_args, matrix_out1, a_args)
matrix_a = matrix_out1
else:
transform_b_func(schema_madlib, matrix_b, b_args, matrix_out1, b_args)
matrix_b = matrix_out1
matrix_out2 = "pg_temp." + unique_string() + "_r"
matrix_out_tmp = matrix_out2 if to_transform_rst else matrix_out
elem_op_func(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out_tmp, out_args, elem_ops)
if to_transform_rst:
transform_rst_func(schema_madlib, matrix_out_tmp, out_args,
matrix_out, out_args)
plpy.execute("DROP TABLE IF EXISTS " + matrix_out_tmp)
def _matrix_elem_op_sparse(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
elem_ops):
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{out_args[row]},
{out_args[col]},
{out_args[val]}
FROM (
SELECT
coalesce(a.{a_args[row]}::bigint, b.{b_args[row]}::bigint) AS {out_args[row]},
coalesce(a.{a_args[col]}::bigint, b.{b_args[col]}::bigint) AS {out_args[col]},
coalesce(a.{a_args[val]}::float8, 0::float8)
{elem_ops[scalar_op]}
coalesce(b.{b_args[val]}::float8, 0::float8) AS {out_args[val]}
FROM
(SELECT * FROM {matrix_a} WHERE {a_args[val]} IS NOT NULL) AS a
FULL OUTER JOIN
(SELECT * FROM {matrix_b} WHERE {b_args[val]} IS NOT NULL) AS b
ON
a.{a_args[row]}::bigint = b.{b_args[row]}::bigint AND
a.{a_args[col]}::bigint = b.{b_args[col]}::bigint
) t
WHERE
t.{out_args[val]}::float8 != 0
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
a_dim = get_dims(matrix_a, a_args)
res_row_dim, res_col_dim = get_dims(matrix_out, out_args)
if res_row_dim != a_dim[0] or res_col_dim != a_dim[1]:
plpy.execute("""
INSERT INTO {matrix_out} VALUES ({row_count}, {col_count}, 0)
""".format(matrix_out=matrix_out,
row_count=a_dim[0],
col_count=a_dim[1]))
def _matrix_elem_op_dense(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
elem_ops):
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
a.{a_args[row]} AS {out_args[row]},
{schema_madlib}.{elem_ops[vector_op]}(
a.{a_args[val]}::float8[],
b.{b_args[val]}::float8[]) AS {out_args[val]}
FROM
{matrix_a} AS a,
{matrix_b} AS b
WHERE
a.{a_args[row]}::bigint = b.{b_args[row]}::bigint
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
# ------------------------------------------------------------------------------
def matrix_add(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args):
elem_ops = {'scalar_op': '+', 'vector_op': 'array_add'}
matrix_elem_op(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
elem_ops)
# ------------------------------------------------------------------------------
def matrix_sub(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args):
elem_ops = {'scalar_op': '-', 'vector_op': 'array_sub'}
matrix_elem_op(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
elem_ops)
# ------------------------------------------------------------------------------
def matrix_extract_diag(schema_madlib, matrix_in, in_args):
""" Main function for main diagonal of matrix
Args:
@param schema_madlib: Name of the MADlib schema
@param matrix_in: Str, Name of the matrix who need the main diagonal
@param in_args: Str, Key-value string providing column names for matrix_in
The string is of the format "key1=value1, key2=value2 ..."
This is parsed to obtain a dictionary.
Alternatively this argument can also be a dictionary
containing the key-value information (in which case no
parsing is performed)
Returns:
@param array_out: float[], Name of the main diagonal array
Side effect:
Creates an output table containing the transpose of matrix_in
"""
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
if _is_sparse(matrix_in, in_args['val']):
validate_sparse(matrix_in, in_args)
return _matrix_extract_diag_sparse(schema_madlib, matrix_in, in_args)
else:
validate_dense(matrix_in, in_args)
return _matrix_extract_diag_dense(schema_madlib, matrix_in, in_args)
def _matrix_extract_diag_sparse(schema_madlib, matrix_in, in_args):
in_dim = get_dims(matrix_in, in_args)
out_num = min(in_dim[0], in_dim[1])
res = plpy.execute("""
SELECT
array_agg(val order by row) AS array_diag
FROM (
SELECT coalesce(val,0) AS val,
num AS row
FROM (
SELECT
{in_args[val]} AS val,
{in_args[row]}::BIGINT AS row
FROM
{matrix_in}
WHERE
{in_args[row]} = {in_args[col]}
) AS t1
RIGHT OUTER JOIN
generate_series(1, {out_num}) num
ON t1.row = num
) t2
""".format(**locals()))[0]['array_diag']
return res
# ------------------------------------------------------------------------------
def _matrix_extract_diag_dense(schema_madlib, matrix_in, in_args):
in_dim = get_dims(matrix_in, in_args)
out_num = min(in_dim[0], in_dim[1])
res = plpy.execute("""
SELECT
array_agg(val order by row) AS array_diag
FROM (
SELECT
{in_args[val]}[{in_args[row]}] AS val,
{in_args[row]}::BIGINT AS row
FROM
{matrix_in}
WHERE
{in_args[row]} <= {out_num}
) AS t2
WHERE val is not NULL
""".format(**locals()))[0]['array_diag']
return res
# ------------------------------------------------------------------------------
def matrix_elem_mult(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args):
elem_ops = {'scalar_op': '*', 'vector_op': 'array_mult'}
matrix_elem_op(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
elem_ops)
# ------------------------------------------------------------------------------
# -- Block operations ------------------------------------------------------
# ------------------------------------------------------------------------------
def matrix_block_mult(schema_madlib, matrix_a, a_args,
matrix_b, b_args,
matrix_out, out_args):
_validate_input_table(matrix_a)
_validate_input_table(matrix_b)
_validate_output_table(matrix_out)
a_args = parse_matrix_args(a_args)
b_args = parse_matrix_args(b_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': a_args['row'],
'col': a_args['col'],
'val': a_args['val']})
_validate_block(matrix_a, a_args)
_validate_block(matrix_b, b_args)
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
row as {out_args[row]},
col as {out_args[col]},
{schema_madlib}.__matrix_mem_sum(block::float8[]) AS {out_args[val]}
FROM
(
SELECT
a.{a_args[row]} AS row,
b.{b_args[col]} AS col,
{schema_madlib}.matrix_mem_mult(a.{a_args[val]}::float8[],
b.{a_args[val]}::float8[]) AS block
FROM
{matrix_a} AS a JOIN {matrix_b} AS b
ON (a.{a_args[col]}::bigint = b.{b_args[row]}::bigint)
) t1
GROUP BY row, col
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
def matrix_block_square(schema_madlib, matrix_in, in_args,
matrix_out, out_args):
matrix_block_mult(schema_madlib, matrix_in, in_args, matrix_in, in_args,
matrix_out, out_args)
def matrix_block_trans(schema_madlib, matrix_in, in_args, matrix_out, out_args):
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
_validate_input_table(matrix_in)
_validate_block(matrix_in, in_args)
_validate_output_table(matrix_out)
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{in_args[row]} AS {out_args[col]},
{in_args[col]} AS {out_args[row]},
{schema_madlib}.matrix_mem_trans({in_args[val]}::float8[]) AS {out_args[val]}
FROM
{matrix_in}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[col]})')
""".format(**locals()))
def matrix_blockize(schema_madlib, matrix_in, in_args,
row_dim, col_dim, matrix_out, out_args):
_assert(row_dim > 0 and col_dim > 0, 'Matrix error: invalid block dimension')
_assert(row_dim * col_dim < sys.maxint,
"Matrix error: Block size requested ({0}) is "
"too large".format(row_dim * col_dim))
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
validate_dense(matrix_in, in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'val': in_args['val']})
rv = plpy.execute('SELECT count(*) AS total_row FROM ' + matrix_in)
total_row = rv[0]['total_row']
residual = total_row % row_dim
border_row = (total_row / row_dim) * row_dim
plpy.execute('DROP TABLE IF EXISTS ' + matrix_out)
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
brow_id AS {out_args[row]},
bcol_id AS {out_args[col]},
{schema_madlib}.__matrix_blockize_agg(
row_id::integer,
row_vec::float8[],
row_dim::integer) AS {out_args[val]}
FROM
(
SELECT
{in_args[row]} as row_id,
(({in_args[row]}::bigint - 1) / {row_dim}::bigint) + 1 AS brow_id,
CASE WHEN {in_args[row]}::bigint <= {border_row}
THEN {row_dim}
ELSE {residual}
END AS row_dim,
{schema_madlib}.__matrix_row_split({in_args[val]}::float8[],
{col_dim}::integer) AS row_vec,
generate_series(1,
ceil((array_upper({in_args[val]}, 1) -
array_lower({in_args[val]}, 1) + 1
)::FLOAT8 / {col_dim})::integer
) as bcol_id
FROM
{matrix_in}
) t1
GROUP BY brow_id, bcol_id
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
def matrix_unblockize(schema_madlib, matrix_in, in_args, matrix_out, out_args):
in_args = parse_matrix_args(in_args)
_validate_block(matrix_in, in_args)
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'val': in_args['val']})
rv = plpy.execute("""
SELECT
array_upper({in_args[val]}, 1) - array_lower({in_args[val]}, 1) + 1 AS row_dim,
array_upper({in_args[val]}, 2) - array_lower({in_args[val]}, 2) + 1 AS col_dim
FROM
{matrix_in}
WHERE
{in_args[row]} = 1 and {in_args[col]} = 1
""".format(matrix_in=matrix_in, in_args=in_args))
row_dim = rv[0]['row_dim']
col_dim = rv[0]['col_dim']
rv = plpy.execute("""
SELECT max({in_args[col]}::bigint) AS max_colid
FROM {matrix_in}
WHERE {in_args[row]}::bigint = 1
""".format(matrix_in=matrix_in, in_args=in_args))
max_colid = rv[0]['max_colid']
rv = plpy.execute("""
SELECT
array_upper({in_args[val]}, 2) -
array_lower({in_args[val]}, 2) + 1 AS col_residual
FROM
{matrix_in}
WHERE
{in_args[row]}::bigint = 1 and {in_args[col]}::bigint = {max_colid}
""".format(matrix_in=matrix_in, in_args=in_args, max_colid=max_colid))
col_residual = rv[0]['col_residual']
total_col_dim = (max_colid - 1) * col_dim + col_residual
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
row as {out_args[row]},
{schema_madlib}.__matrix_unblockize_agg(
{total_col_dim}::integer,
((col-1) * {col_dim} + 1)::integer,
row_vec::float8[]) AS row_vec
FROM
(
SELECT
({in_args[row]}::bigint - 1) * {row_dim}::bigint +
generate_series(1,
array_upper({in_args[val]}::float8[], 1) -
array_lower({in_args[val]}::float8[], 1) + 1) AS row,
{in_args[col]} AS col,
{schema_madlib}.__matrix_unnest_block({in_args[val]}::float8[]) AS row_vec
FROM
{matrix_in}
) t1
GROUP BY
row
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
# ------------------------------------------------------------------------------
# -- Visitor operations --------------------------------------------------------
# ------------------------------------------------------------------------------
def matrix_extract(schema_madlib, matrix_in, in_args, extract_dim, index=1):
""" Extracts wanted row or column from matrix.
Args:
@param schema_madlib: str, Name of the schema containing madlib functions
@param matrix_in: str, Name of the table containing matrix
@param in_args: str, Name-value pair string containing options for matrix_in
@param extract_dim: int, If 1 extract row. If 2, extract col
@param index: int, The index to be extracted, starting from 1 and default is 1.
Returns:
Vector of elements of wanted column.
Side effect:
None
"""
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
in_dim = get_dims(matrix_in, in_args)
_assert(min(in_dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
_assert(index > 0, "Matrix error: The index ({0}) should be positive".format(index))
_assert(in_dim[extract_dim - 1] >= index,
"Matrix error: The index ({0}) is invalid compared to "
"dimension size ({1})".format(index, in_dim[extract_dim - 1]))
if _is_sparse(matrix_in, in_args['val']):
validate_sparse(matrix_in, in_args)
return _matrix_extract_sparse(schema_madlib, matrix_in, in_args,
extract_dim, in_dim, index)
else:
validate_dense(matrix_in, in_args, in_dim[0])
if extract_dim == 1:
return _matrix_extract_row_dense(schema_madlib, matrix_in, in_args, index)
else:
return _matrix_extract_col_dense(schema_madlib, matrix_in, in_args, index)
def _matrix_extract_sparse(schema_madlib, matrix_in, in_args,
dim, dim_sizes, index=1):
# we define two variables:
# e_dim - the dimension to be extracted.
# If dim = 1, e_dim = row
# If dim = 2, e_dim = col
# o_dim - the orthogonal dimension to e_dim
e_dim = ('row', 'col')[dim - 1]
e_dim = in_args[e_dim]
o_dim = ('col', 'row')[dim - 1]
o_dim_size = dim_sizes[0 if o_dim == 'row' else 1]
o_dim = in_args[o_dim]
r = plpy.execute("""
SELECT
array_agg(val order by {o_dim}) as res
FROM (
SELECT
o_dim_values as {o_dim},
coalesce(a.val::FLOAT8, 0::FLOAT8) AS val
FROM (
SELECT {in_args[val]}::FLOAT8 as val,
{o_dim}::BIGINT as {o_dim}
FROM
{matrix_in} m
WHERE
{in_args[val]} IS NOT NULL AND
{e_dim}::BIGINT = {index}::BIGINT
) AS a
RIGHT JOIN
generate_series(1, {o_dim_size}) o_dim_values
ON
a.{o_dim}::bigint = o_dim_values::bigint
) t
""".format(**locals()))
_assert(r and len(r) == 1,
"Matrix error: Invalid index ({0}) for matrix {1}".format(index, matrix_in))
return r[0]['res']
def _matrix_extract_row_dense(schema_madlib, matrix_in, in_args, index=1):
r = plpy.execute("""
SELECT
m.{in_args[val]} AS res,
{schema_madlib}.assert(not {schema_madlib}.array_contains_null(m.{in_args[val]}),
'Matrix error: Element in dense matrix should not be NULL')
FROM
{matrix_in} AS m
WHERE
m.{in_args[row]}::BIGINT = {index}::BIGINT
""".format(**locals()))
_assert(len(r) == 1,
"Matrix error: Invalid row index ({0}) for matrix {1}".format(index, matrix_in))
return r[0]['res']
def _matrix_extract_col_dense(schema_madlib, matrix_in, in_args, index=1):
r = plpy.execute("""
SELECT
array_agg(val order by row) as res
FROM (
SELECT
{in_args[row]}::BIGINT AS row,
{in_args[val]}[{index}] AS val,
{schema_madlib}.assert({in_args[val]}[{index}] is not NULL,
'Matrix error: Element in dense matrix should not be NULL')
FROM
{matrix_in}
) t
""".format(**locals()))
_assert(len(r) == 1,
"Matrix error: Invalid row index ({0}) for matrix {1}".format(index, matrix_in))
return r[0]['res']
# ------------------------------------------------------------------------------
# -- Extreme value operations --------------------------------------------------
# ------------------------------------------------------------------------------
def matrix_extremum(schema_madlib, matrix_in, in_args, dim, matrix_out, op,
fetch_index=False):
""" Extracts wanted extremum value(max/min) from matrix.
Args:
@param schema_madlib: str, Name of the schema containing madlib functions
@param matrix_in: str, Name of the table containing matrix
@param in_args: str, Name-value pair string containing options for matrix_in
@param dim: int, Along which dimension you want to get extremum value.
dim = 1 implies compute extremum for each column
dim = 2 implies compute extremum for each row
@param matrix_out: str, Result matrix table with a vector column storing extremum values.
If fetch_index is True, contains another index column for
first occurrence indices of the extremum value.
@param op: str, Can be either 'max' or 'min'
@param fetch_index: bool, If true, fetch first occurrence indices of extremum value.
Returns:
None
Side effect:
Creates an output table containing the result max/min values as well as
their first occurrence indices if wanted.
"""
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
in_dim = get_dims(matrix_in, in_args)
op = op.lower()
_assert(min(in_dim) > 0, "Matrix error: Invalid dimensions for input matrix")
_assert(dim == 1 or dim == 2,
"Matrix error: Invalid dimensionality: {0}. It should be either 1 or 2".format(dim))
if _is_sparse(matrix_in, in_args['val']):
# matrix_in is sparse
validate_sparse(matrix_in, in_args)
_matrix_extremum_sparse(schema_madlib, matrix_in, in_args,
matrix_out, fetch_index,
dim, in_dim, op)
else:
validate_dense(matrix_in, in_args, in_dim[0])
if dim == 2:
_matrix_row_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out,
fetch_index, op)
else:
_matrix_col_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out,
fetch_index, in_dim[1], op)
def _matrix_extremum_sparse(schema_madlib, matrix_in, in_args, matrix_out,
fetch_index, dim, dim_sizes, op):
"""
Args:
@param dim: Dimension identifier - can only be 1 or 2
1 = compute extremum for each column (i.e. aggregate over rows)
2 = compute extremum for each row (i.e. aggregate over columns)
@param dim_sizes: The number of elements in each dimension (size must be 2)
Returns:
"""
# we define two variables:
# a_dim = aggregated dimension, this is the dimension across which we compute
# the extremum. This dimension is aggregated and the number of elements
# in the result along this dimension will be 1
# o_dim = orthogonal dimension, the dimension orthogonal to a_dim
#
# When dim = 1, we aggregate across rows (a_dim = row) to get an extremum value for each column
# When dim = 2, we aggregate across cols (a_dim = col) to get an extremum value for each row
a_dim = ('row', 'col')[dim - 1]
a_dim = in_args[a_dim]
o_dim = ('col', 'row')[dim - 1] # the other dimension
o_dim_size = dim_sizes[0 if o_dim == 'row' else 1]
a_dim_size = dim_sizes[1 if o_dim == 'row' else 0]
o_dim = in_args[o_dim]
agg_index = ''
if fetch_index:
agg_index = "array_agg({a_dim} order by {o_dim}) as index,".format(**locals())
if op == 'min':
extremum_name = 'min'
vector_func = 'array_min_index'
else:
extremum_name = 'max'
vector_func = 'array_max_index'
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{agg_index}
array_agg({in_args[val]} order by {o_dim}) as {extremum_name}
FROM (
SELECT
{o_dim},
coalesce({in_args[val]}[2]::BIGINT, 0) as {a_dim},
coalesce({in_args[val]}[1], 0) as {in_args[val]}
FROM (
SELECT
{o_dim}::BIGINT as {o_dim},
{schema_madlib}.{vector_func}(
{schema_madlib}.__matrix_densify_agg(
{a_dim_size}::INTEGER,
{a_dim}::INTEGER,
{in_args[val]}::FLOAT8))
AS {in_args[val]}
FROM
{matrix_in}
WHERE
{in_args[val]} IS NOT NULL
GROUP BY
{o_dim}) i
RIGHT OUTER JOIN
generate_series(1, {o_dim_size}) b
ON
i.{o_dim} = b) m
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({extremum_name})')
""".format(**locals()))
def _matrix_col_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out,
fetch_index, col_count, op):
agg_index = ''
if fetch_index:
agg_index = "array_agg(row order by col) as index,".format(**locals())
if op == 'min':
extremum_name = 'min'
sort_order = 'asc'
else:
extremum_name = 'max'
sort_order = 'desc'
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{agg_index}
array_agg(val order by col) as {extremum_name}
FROM
(SELECT
row,
col,
val,
row_number() over (partition by col order by val {sort_order}, row) as rn
FROM
(SELECT
unnest({in_args[val]}) as val,
{in_args[row]}::BIGINT as row,
generate_series(1, {col_count}) as col
FROM
{matrix_in}) m) t
WHERE
rn = 1
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({extremum_name})')
""".format(**locals()))
def _matrix_row_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out, fetch_index, op):
agg_index = ''
if fetch_index:
agg_index = 'array_agg(value_index[2] order by row) as index,'
extremum_name = 'max'
vector_func_name = 'array_argmax'
if op == 'min':
extremum_name = 'min'
vector_func_name = 'array_min_index'
else:
extremum_name = 'max'
vector_func_name = 'array_max_index'
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{agg_index}
array_agg(value_index[1] order by row) as {extremum_name}
FROM
(SELECT
{in_args[row]}::BIGINT as row,
{schema_madlib}.{vector_func_name}({in_args[val]}) as value_index
FROM
{matrix_in}) m
WHERE
value_index[2] <> 0
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({extremum_name})')
""".format(**locals()))
# ------------------------------------------------------------------------------
def matrix_max(schema_madlib, matrix_in, in_args, dim, matrix_out,
fetch_index=False):
matrix_extremum(schema_madlib, matrix_in, in_args, dim,
matrix_out, 'max', fetch_index)
# ------------------------------------------------------------------------------
def matrix_min(schema_madlib, matrix_in, in_args, dim, matrix_out,
fetch_index=False):
matrix_extremum(schema_madlib, matrix_in, in_args, dim,
matrix_out, 'min', fetch_index)
# ------------------------------------------------------------------------------
# -- Reduction operations ------------------------------------------------------
# ------------------------------------------------------------------------------
def _matrix_reduction_op_sparse(schema_madlib, matrix_in, in_args,
dim, in_dims, expr):
# When dim = 1, we aggregate across rows (a_dim = row) to get an reduction for each column
# When dim = 2, we aggregate across cols (a_dim = col) to get an reduction for each row
# we define three variables:
# a_dim = aggregated dimension, this is the dimension across which we compute
# the reduction. This dimension is aggregated and the number of elements
# in the result along this dimension will be 1
# n_a_entries = Number of elements along a_dim
# normalizer = Number of elements along the dimension orthogonal to a_dim.
# This is used as a normalizer in reduction operations that
# compute central tendency (averages)
if dim == 1:
a_dim = in_args['col']
n_entries = in_dims[1]
normalizer = in_dims[0]
else:
a_dim = in_args['row']
n_entries = in_dims[0]
normalizer = in_dims[1]
expr = expr.format(**locals())
r = plpy.execute("""
SELECT
array_agg(op_val order by {a_dim}) as res
FROM (
SELECT
{a_dim},
{expr} as op_val
FROM
(SELECT
b as {a_dim},
coalesce(m.{in_args[val]}::FLOAT8, 0::FLOAT8) as value
FROM
{matrix_in} m
RIGHT OUTER JOIN
generate_series(1, {n_entries}) b
ON m.{a_dim}::bigint = b::bigint) t
GROUP BY {a_dim}
) op_t
""".format(**locals()))
_assert(len(r) == 1,
"Matrix error: Bad matrix dimension, "
"row:{row}, column:{col}".format(row=in_dims[0], col=in_dims[1]))
return r[0]['res']
def _matrix_row_reduction_op_dense(schema_madlib, matrix_in, in_args,
vector_op):
r = plpy.execute("""
SELECT
array_agg( {schema_madlib}.{vector_op}( {in_args[val]}::FLOAT8[] )
order by {in_args[row]}::BIGINT ) as res
FROM
{matrix_in} m
""".format(**locals()))
_assert(len(r) == 1,
"Matrix error: Invalid values in value column, " + in_args['val'])
return r[0]['res']
def _matrix_col_reduction_op_dense(schema_madlib, matrix_in, in_args,
col_count, vector_op):
r = plpy.execute("""
SELECT
array_agg(val order by col) as res
FROM (
SELECT
col,
{vector_op}(val::float8) as val
FROM
(SELECT
unnest({in_args[val]}) as val,
{in_args[row]}::BIGINT as row,
generate_series(1, {col_count}) as col
FROM
{matrix_in}) m
GROUP BY
col
) t
""".format(**locals()))
_assert(len(r) == 1,
"Matrix error: Invalid values in value column, " + in_args['val'])
return r[0]['res']
def matrix_reduction_op(schema_madlib, matrix_in, in_args, dim, vector_ops):
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
dimensions = get_dims(matrix_in, in_args)
_assert(min(dimensions) > 0,
"Matrix error: Invalid dimensions for input matrix")
_assert(dim == 1 or dim == 2,
"Matrix error: Invalid value for dimension ({0}). "
"The dimension should be 1 for column and 2 for row".format(dim))
if _is_sparse(matrix_in, in_args['val']):
validate_sparse(matrix_in, in_args)
return _matrix_reduction_op_sparse(schema_madlib, matrix_in,
in_args, dim, dimensions,
vector_ops['sparse_agg_expr'])
else:
validate_dense(matrix_in, in_args, dimensions[0])
if dim == 1:
return _matrix_col_reduction_op_dense(schema_madlib, matrix_in,
in_args, dimensions[1],
vector_ops['dense_agg_op'])
else:
return _matrix_row_reduction_op_dense(schema_madlib, matrix_in,
in_args,
vector_ops['dense_array_op'])
# ------------------------------------------------------------------------------
def matrix_sum(schema_madlib, matrix_in, in_args, dim):
""" Calculate sum along the dimension of matrix.
Args:
@param schema_madlib: str, Name of the schema containing madlib functions
@param matrix_in: str, Name of the table containing matrix
@param in_args: str, Name-value pair string containing options for matrix_in
@param dim: int, Dimension, 1 for column and 2 for row.
Returns:
Vector of orderred sum value along dimension.
Side effect:
None
"""
vector_ops = {'sparse_agg_expr': 'sum(value)',
'dense_agg_op': 'sum',
'dense_array_op': 'array_sum_big'}
return matrix_reduction_op(schema_madlib, matrix_in, in_args, dim, vector_ops)
# ------------------------------------------------------------------------------
def matrix_mean(schema_madlib, matrix_in, in_args, dim):
""" Calculate mean along the dimension of matrix.
Args:
@param schema_madlib: str, Name of the schema containing madlib functions
@param matrix_in: str, Name of the table containing matrix
@param in_args: str, Name-value pair string containing options for matrix_in
@param dim: int, Dimension, 1 for column and 2 for row.
Returns:
Vector of orderred mean value along dimension.
Side effect:
None
"""
vector_ops = {'sparse_agg_expr': 'sum(value)::float8/{normalizer}',
'dense_agg_op': 'avg',
'dense_array_op': 'array_mean'}
return matrix_reduction_op(schema_madlib, matrix_in, in_args, dim, vector_ops)
# ------------------------------------------------------------------------------
# -- Mathematical operations ------------------------------------------------------
# ------------------------------------------------------------------------------
def matrix_trans(schema_madlib, matrix_in, in_args,
matrix_out, out_args):
""" Main function for transpose of matrix
Args:
@param schema_madlib: Name of the MADlib schema
@param matrix_in: Str, Name of the matrix to be transposed
@param in_args: Str, Key-value string providing column names for matrix_in
The string is of the format "key1=value1, key2=value2 ..."
This is parsed to obtain a dictionary.
Alternatively this argument can also be a dictionary
containing the key-value information (in which case no
parsing is performed)
@param matrix_out: Str, Name of the matrix to contain transposed result
@param out_args: Str, Key-value string providing column names for
matrix_out. Like in_args, this can also be dictionary.
Returns:
None
Side effect:
Creates an output table containing the transpose of matrix_in
"""
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
fmt = out_args['fmt'] if 'fmt' in out_args else None
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
validate_func = validate_sparse if is_input_sparse else validate_dense
trans_func = _matrix_trans_sparse if is_input_sparse else _matrix_trans_dense
change_func = matrix_densify if is_input_sparse else matrix_sparsify
change_fmt = fmt and fmt == ('sparse', 'dense')[is_input_sparse]
validate_func(matrix_in, in_args)
matrix_r = "pg_temp." + unique_string() + "_out1" if change_fmt else matrix_out
trans_func(schema_madlib, matrix_in, in_args, matrix_r, out_args)
if change_fmt:
change_func(schema_madlib, matrix_r, out_args, matrix_out, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r)
def _matrix_trans_sparse(schema_madlib, matrix_in, in_args,
matrix_out, out_args):
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{in_args[row]} AS {out_args[col]},
{in_args[col]} AS {out_args[row]},
{in_args[val]} AS {out_args[val]}
FROM
{matrix_in}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[col]})')
""".format(matrix_in=matrix_in,
in_args=in_args,
matrix_out=matrix_out,
out_args=out_args))
def _matrix_trans_dense(schema_madlib, matrix_in, in_args,
matrix_out, out_args):
(row_dim, col_dim) = get_dims(matrix_in, in_args)
_assert(row_dim < sys.maxint,
"Matrix error: Matrix {0} has too many rows. "
"This cannot be transposed in a dense format due to "
"restrictions on maximum array size.".format(matrix_in))
temp_row, temp_col, temp_val = (unique_string() for i in range(3))
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{temp_col} AS {out_args[row]},
{schema_madlib}.__matrix_densify_agg(
{row_dim}::integer,
{temp_row}::integer,
{temp_val}) AS {out_args[val]}
FROM
( SELECT {in_args[row]} as {temp_row},
unnest({in_args[val]}) AS {temp_val},
generate_series(1, {col_dim}) AS {temp_col}
FROM
{matrix_in}
) t1
GROUP BY
{temp_col}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
# ------------------------------------------------------------------------------
def matrix_mult(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args=None):
_validate_input_table(matrix_a)
_validate_input_table(matrix_b)
_validate_output_table(matrix_out)
a_args = parse_matrix_args(a_args)
b_args = parse_matrix_args(b_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': a_args['row'],
'col': a_args['col'],
'val': a_args['val']})
to_drop = []
if a_args['trans']:
matrix_trans_a = "pg_temp." + unique_string() + "_trans_a"
matrix_trans(schema_madlib, matrix_a, a_args, matrix_trans_a, None)
matrix_a = matrix_trans_a
to_drop.append(matrix_trans_a)
# we defer the transpose operation for matrix_b since dense multiplication will
# require to transpose B. We should avoid double transpose.
a_dim = get_dims(matrix_a, a_args)
b_dim = get_dims(matrix_b, b_args)
_assert(min(a_dim) > 0 and min(b_dim) > 0,
"Matrix error: Invalid dimensions for input matrices")
left_dim = a_dim[1]
right_dim = b_dim[0] if not b_args['trans'] else b_dim[1]
_assert(left_dim == right_dim,
"Matrix error: Dimension mismatch for matrix multiplication.\n"
"Left matrix, col dimension = {0}, "
"Right matrix, row dimension = {1}".format(left_dim, right_dim))
fmt = out_args['fmt'] if 'fmt' in out_args else None
# both matrix_a and matrix_b are dense
if (not _is_sparse(matrix_a, a_args['val']) and
not _is_sparse(matrix_b, b_args['val'])):
validate_dense(matrix_a, a_args, row_dim=a_dim[0])
validate_dense(matrix_b, b_args, row_dim=b_dim[0])
if (fmt is None or fmt == 'dense'):
_matrix_mult_dense(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
is_b_transposed=b_args['trans'])
else:
matrix_temp = "pg_temp." + unique_string() + "_temp"
_matrix_mult_dense(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_temp, _matrix_default_args(),
is_b_transposed=b_args['trans'])
matrix_sparsify(schema_madlib, matrix_temp, _matrix_default_args(), matrix_out, out_args)
to_drop.append(matrix_temp)
else:
# sparsify matrix_a if not sparse
if not _is_sparse(matrix_a, a_args['val']):
matrix_temp_a = "pg_temp." + unique_string() + "_temp_a"
matrix_sparsify(schema_madlib, matrix_a, a_args,
matrix_temp_a, _matrix_default_args())
matrix_a = matrix_temp_a
a_args = _matrix_default_args()
to_drop.append(matrix_a)
else:
validate_sparse(matrix_a, a_args)
# sparsify matrix_b if not sparse
if not _is_sparse(matrix_b, b_args['val']):
matrix_temp_b = "pg_temp." + unique_string() + "_temp_b"
matrix_sparsify(schema_madlib, matrix_b, b_args, matrix_temp_b, None)
matrix_b = matrix_temp_b
to_drop.append(matrix_b)
if 'col' not in b_args:
b_args['col'] = _matrix_default_args()['col']
else:
validate_sparse(matrix_b, b_args)
# use transpose of 'b' if specified by user
if b_args['trans']:
matrix_trans_b = "pg_temp." + unique_string() + "_trans_b"
matrix_trans(schema_madlib, matrix_b, b_args, matrix_trans_b, None)
matrix_b = matrix_trans_b
to_drop.append(matrix_trans_b)
is_output_sparse = fmt and fmt == 'sparse'
matrix_result = matrix_out if is_output_sparse else "pg_temp." + unique_string() + "_r"
_matrix_mult_sparse(schema_madlib, matrix_a, a_args,
matrix_b, b_args,
matrix_result, _matrix_default_args())
if not is_output_sparse:
matrix_densify(schema_madlib,
matrix_result, _matrix_default_args(),
matrix_out, out_args)
to_drop.append(matrix_result)
for each_table in to_drop:
plpy.execute('DROP TABLE IF EXISTS %s' % each_table)
def _matrix_mult_sparse(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args):
a_dim = get_dims(matrix_a, a_args)
b_dim = get_dims(matrix_b, b_args)
temp_row, temp_col, temp_val = (unique_string() for i in range(3))
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{temp_row} as {out_args[row]},
{temp_col} as {out_args[col]},
sum({temp_val}) AS {out_args[val]}
FROM
(
SELECT
a.{a_args[row]} AS {temp_row},
b.{b_args[col]} AS {temp_col},
a.{a_args[val]}::float8 * b.{b_args[val]}::float8 AS {temp_val}
FROM
{matrix_a} AS a,
{matrix_b} AS b
WHERE
a.{a_args[val]} IS NOT NULL AND a.{a_args[val]} != 0 AND
b.{b_args[val]} IS NOT NULL AND b.{b_args[val]} != 0 AND
a.{a_args[col]}::bigint = b.{b_args[row]}::bigint
) t
GROUP BY
{temp_row}, {temp_col}
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
# if the last row or column is all zeros then we need to add a redundant
# entry to ensure the matrix dimensions can be interpreted
res_row_dim, res_col_dim = get_dims(matrix_out, out_args)
if res_row_dim != a_dim[0] or res_col_dim != b_dim[1]:
plpy.execute("""
INSERT INTO {matrix_out} VALUES ({row_count}, {col_count}, 0)
""".format(matrix_out=matrix_out,
row_count=a_dim[0],
col_count=b_dim[1]))
def _matrix_mult_dense(schema_madlib, matrix_a, a_args,
matrix_b, b_args, matrix_out, out_args,
is_b_transposed=False):
temp_row, temp_col, temp_val = (unique_string() for i in range(3))
# compute trans of matrix_b to compute dot product between
# rows of matrix_a and cols of matrix_b (this is necessary only if )
if (is_b_transposed):
# no need to explicitly transpose since it's already in transpose form
matrix_b_trans = matrix_b
b_trans_args = b_args
else:
matrix_b_trans = "pg_temp." + unique_string() + "_a6"
b_trans_args = _matrix_default_args()
_matrix_trans_dense(schema_madlib,
matrix_b, b_args,
matrix_b_trans, b_trans_args)
b_trans_dim = get_dims(matrix_b_trans, b_trans_args)
if b_trans_dim[0] >= (1024 * 1024 * 1024 / 8):
# we are restricted by allocation limit of 1GB. Since array is of type
# 8-byte float, we have a limitation on how big the array can be.
plpy.error("Matrix error: Matrix {0} has too many columns. "
"This cannot be used in multiplication due to "
"restrictions on maximum array size.".format(matrix_b))
# compute dot products in sparse format
matrix_r_sparse = "pg_temp." + unique_string() + "_a7"
r_s_args = _matrix_default_args()
plpy.execute("""
CREATE TABLE {matrix_r_sparse}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
a.{a_args[row]} AS {r_s_args[row]},
b.{b_trans_args[row]} AS {r_s_args[col]},
{schema_madlib}.array_dot(
a.{a_args[val]}::float8[],
b.{b_trans_args[val]}::float8[]) AS {r_s_args[val]}
FROM
{matrix_a} AS a,
{matrix_b_trans} AS b
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY ({r_s_args[row]})')
""".format(**locals()))
# densify result since it'll always be dense
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{r_s_args[row]} as {out_args[row]},
{schema_madlib}.__matrix_densify_agg(
{col_dim}::integer,
{r_s_args[col]}::integer,
{r_s_args[val]}::float8) AS {out_args[val]}
FROM
{matrix_r_sparse}
GROUP BY
{r_s_args[row]}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(col_dim=b_trans_dim[0], **locals()))
plpy.execute("DROP TABLE IF EXISTS " + matrix_r_sparse)
if not is_b_transposed:
plpy.execute("DROP TABLE IF EXISTS " + matrix_b_trans)
# ------------------------------------------------------------------------------
def matrix_square(schema_madlib, matrix_in, in_args,
matrix_out, out_args):
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
dim = get_dims(matrix_in, in_args)
_assert(dim[1] == dim[0],
"Matrix error: Square operation is only defined for square matrices")
fmt = None if 'fmt' not in out_args else out_args['fmt']
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
validate_func = validate_sparse if is_input_sparse else validate_dense
mult_func = _matrix_mult_sparse if is_input_sparse else _matrix_mult_dense
change_func = matrix_sparsify if is_input_sparse else matrix_densify
change_fmt = fmt and fmt == ('sparse', 'dense')[is_input_sparse]
validate_func(matrix_in, in_args)
matrix_r = "pg_temp." + unique_string() if change_fmt else matrix_out
mult_func(schema_madlib, matrix_in, in_args,
matrix_in, in_args, matrix_r, out_args)
if change_fmt:
change_func(schema_madlib, matrix_r, out_args, matrix_out, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r)
# ------------------------------------------------------------------------------
def _create_zero_matrix(schema_madlib, matrix_out, out_args, in_dim, fmt):
row_dim = in_dim[0]
col_dim = in_dim[1]
if fmt is None or fmt == 'sparse':
# just add a single row with the dimensionality
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT {row_dim} AS {out_args[row]},
{col_dim} AS {out_args[col]},
0::FLOAT8 AS {out_args[val]}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})');
""".format(**locals()))
else:
# add multiple rows of all zero vectors
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT {out_args[row]},
{schema_madlib}.array_of_float({col_dim})::FLOAT8[] as {out_args[val]}
FROM generate_series(1, {row_dim}) AS {out_args[row]}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})');
""".format(**locals()))
def _matrix_scalar_mult_sparse_to_dense(schema_madlib, matrix_in, in_args,
scalar, matrix_out, out_args, in_dim):
create_sql = """
CREATE TABLE {matrix_out}
({out_args[row]} BIGINT, {out_args[val]} FLOAT8[])
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE) DISTRIBUTED BY ({in_args[row]})');
"""
row_dim = in_dim[0]
col_dim = in_dim[1]
plpy.execute(create_sql.format(**locals()))
plan = plpy.execute("""
INSERT INTO {matrix_out}
SELECT
b AS {in_args[row]},
{schema_madlib}.__matrix_densify_agg(
{col_dim},
CASE WHEN {in_args[val]} IS NULL THEN 1 ELSE {in_args[col]}::INTEGER END,
coalesce({in_args[val]} * {scalar}, 0)) AS {out_args[val]}
FROM
{matrix_in} m
RIGHT OUTER JOIN
generate_series(1, {row_dim}) b
ON m.{in_args[row]} = b
GROUP BY b
""".format(**locals()))
def _matrix_scalar_mult_sparse_to_sparse(matrix_in, in_args, scalar, matrix_out, out_args):
create_sql = """
CREATE TABLE {matrix_out}
({out_args[row]} BIGINT, {out_args[col]} BIGINT, {out_args[val]} FLOAT8)
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE) DISTRIBUTED BY ({in_args[row]})');
"""
plpy.execute(create_sql.format(**locals()))
plan = plpy.prepare("""
INSERT INTO {matrix_out}
SELECT
{in_args[row]}::BIGINT,
{in_args[col]}::BIGINT,
{in_args[val]} * $1
FROM
{matrix_in}
WHERE
{in_args[val]} <> 0
""".format(**locals()), ["FLOAT8"])
plpy.execute(plan, [scalar])
def _matrix_scalar_mult_dense(schema_madlib, matrix_in, in_args,
scalar, matrix_out, out_args, fmt, in_dim):
is_output_sparse = fmt and fmt == "sparse"
array_mult_arg = ("{schema_madlib}.array_scalar_mult("
" {in_args[val]}::float8[],"
" {scalar}::float8)".format(**locals()))
if not is_output_sparse:
col_arg = ''
unnest_arg = array_mult_arg
else:
# for sparse output, unnest the multiplied array and add a col_id
col_dim = in_dim[1]
col_arg = "generate_series(1, {0})::BIGINT AS {1},".format(col_dim, out_args['col'])
unnest_arg = "unnest(" + array_mult_arg + ")"
plpy.execute("""
CREATE TABLE {matrix_out} AS
SELECT
{in_args[row]}::BIGINT AS {out_args[row]},
{col_arg}
{unnest_arg} AS {out_args[val]}
FROM
{matrix_in}
""".format(**locals()))
def matrix_scalar_mult(schema_madlib, matrix_in, in_args, scalar, matrix_out, out_args):
""" Multiply matrix with a scalar.
Args:
@param schema_madlib: Str, Name of the schema containing madlib functions
@param matrix_in: Str, Name of the table containing matrix
@param in_args: Str, Name-value pair string containing options for matrix_in
@param scalar: Same as the type of elements in matrix_in
@param matrix_out: Str, Name of the matrix to contain transposed result
@param out_args: Str, Key-value string providing column names for
matrix_out. Like in_args, this can also be dictionary.
Returns:
None
Side effect:
None
"""
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
_assert(scalar is not None, "Matrix error: Invalid value for scalar")
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
in_dim = get_dims(matrix_in, in_args)
_assert(min(in_dim) > 0,
"Matrix error: Invalid dimensions for input matrices")
fmt = out_args['fmt'] if 'fmt' in out_args else None
with plpy.subtransaction():
in_dim = get_dims(matrix_in, in_args)
if scalar == 0:
_create_zero_matrix(schema_madlib, matrix_out, out_args, in_dim, fmt)
elif _is_sparse(matrix_in, in_args['val']):
validate_sparse(matrix_in, in_args)
if fmt is None or fmt == 'sparse':
_matrix_scalar_mult_sparse_to_sparse(matrix_in, in_args,
scalar, matrix_out, out_args)
else:
_matrix_scalar_mult_sparse_to_dense(schema_madlib, matrix_in,
in_args, scalar,
matrix_out, out_args, in_dim)
else:
validate_dense(matrix_in, in_args)
_matrix_scalar_mult_dense(schema_madlib, matrix_in, in_args,
scalar, matrix_out, out_args, fmt, in_dim)
# ------------------------------------------------------------------------------
def _matrix_vec_mult_dense(schema_madlib, matrix_in, in_args, vector):
plan = plpy.prepare("""
SELECT
array_agg(val::float8 order by row::bigint) as res
FROM (
SELECT
{in_args[row]} as row,
{schema_madlib}.array_dot({in_args[val]}::float8[],
$1::float8[]) as val
FROM
{matrix_in}
) l
""".format(**locals()), ["FLOAT8[]"])
res = plpy.execute(plan, [vector])
_assert(len(res) == 1,
"Matrix error: Invalid values in value column, " + in_args['val'])
return res[0]['res']
# ------------------------------------------------------------------------------
def matrix_vec_mult(schema_madlib, matrix_in, in_args, vector):
""" Multiply matrix with vector.
Args:
@param schema_madlib: Str, Name of the schema containing madlib functions
@param matrix_in: Str, Name of the table containing matrix
@param in_args: Str, Name-value pair string containing options for matrix_in
@param vector: List, whose type is the same as type of elements in matrix_in
@return List, The result of matrix_in * vector
Returns:
None
Side effect:
None
"""
_validate_input_table(matrix_in)
_assert(vector is not None, "Matrix error: Invalid input for vector")
in_args = parse_matrix_args(in_args)
in_dim = get_dims(matrix_in, in_args)
_assert(min(in_dim) > 0,
"Matrix error: Invalid dimensions for input matrices")
_assert(in_dim[1] == len(vector),
"Matrix error: Dimension mismatch between matrix ({0[0]} x {0[1]}) "
"and vector ({1} x 1)".format(in_dim, len(vector)))
matrix_temp = None
if _is_sparse(matrix_in, in_args['val']):
validate_sparse(matrix_in, in_args)
matrix_temp = "pg_temp." + unique_string() + "_a5"
matrix_densify(schema_madlib,
matrix_in, in_args,
matrix_temp, in_args)
matrix_in = matrix_temp
else:
validate_dense(matrix_in, in_args)
res = _matrix_vec_mult_dense(schema_madlib, matrix_in, in_args, vector)
if matrix_temp is not None:
plpy.execute("DROP TABLE IF EXISTS " + matrix_temp)
return res
# ------------------------------------------------------------------------------
def matrix_scale_and_add(schema_madlib, matrix_a, a_args,
matrix_b, b_args, scale, matrix_out, out_args, **kwargs):
"""
Perform a matrix scale operation on a dense matrix, e.g, A + c*B.
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param matrix_a Matrix input A
@param matrix_b Matrix input B
@param scale Scale in the scale and add operation
@param matrix_out Matrix R
Returns:
@param norm
Throws:
plpy.error if any argument is invalid
"""
# TODO: this function only works with dense matrices. Need to add sparse support
_validate_input_table(matrix_a)
_validate_input_table(matrix_b)
_validate_output_table(matrix_out)
a_args = parse_matrix_args(a_args)
b_args = parse_matrix_args(b_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': a_args['row'],
'col': a_args['col'],
'val': a_args['val']})
a_dim = get_dims(matrix_a, a_args)
b_dim = get_dims(matrix_b, b_args)
_assert(min(a_dim) > 0 and min(b_dim) > 0,
"Matrix error: Invalid dimensions for input matrices")
_assert(a_dim[0] == b_dim[0] and a_dim[1] == b_dim[1],
"Matrix error: The dimensions of the two matrices don't match")
validate_dense(matrix_a, a_args, row_dim=a_dim[0])
validate_dense(matrix_b, b_args, row_dim=b_dim[0])
if scale is None:
plpy.error("Matrix error : Scale cannot be NULL")
# Create a dense matrix table
plpy.execute(
"""
CREATE TABLE {matrix_out} AS
SELECT a.{a_args[row]} as {out_args[row]},
{schema_madlib}.array_add(
a.{a_args[val]}::float8[],
{schema_madlib}.array_scalar_mult(
b.{b_args[val]}::float8[],
({scale})::DOUBLE PRECISION))
as {out_args[val]}
FROM
{matrix_a} as a, {matrix_b} as b
WHERE a.{a_args[row]}::bigint = b.{b_args[row]}::bigint
""".format(**locals()))
# ------------------------------------------------------------------------------
# -----------------------------------------------------------------------
# Norm operations
# -----------------------------------------------------------------------
def _matrix_elem_norm(schema_madlib, matrix_in, in_args, p):
"""
Perform a matrix_elem_norm operation on a sparse matrix
Args:
@param matrix_in Name of the source table
@param in_args Name of the source table format args
@param p Element-wise norm value
Returns:
@param norm
Throws:
plpy.error if any argument is invalid
"""
_validate_input_table(matrix_in)
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
sum_fun = "sum"
pow_fun = "pow"
val_expr = "{0[val]}::float8".format(in_args)
validate_sparse(matrix_in, in_args)
else:
sum_fun = "{0}.array_sum".format(schema_madlib)
pow_fun = "{0}.array_pow".format(schema_madlib)
val_expr = "{0[val]}::float8[]".format(in_args)
validate_dense(matrix_in, in_args)
in_args = parse_matrix_args(in_args)
norm = plpy.execute("""
SELECT
pow(sum(pow_norm), 1./{p}::float8) as res
FROM (
SELECT {sum_fun}(
{pow_fun}({val_expr}, {p}::float8)) AS pow_norm
FROM {matrix_in}
) t
""".format(**locals()))
_assert(norm is not None and len(norm) == 1 and 'res' in norm[0],
"Matrix error: invalid result")
return norm[0]['res']
# ------------------------------------------------------------------------------
def _matrix_induced_norm(schema_madlib, matrix_in, in_args, norm_type):
_validate_input_table(matrix_in)
if norm_type == "spectral":
return _matrix_spectral_norm(schema_madlib, matrix_in, in_args)
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
validate_sparse(matrix_in, in_args)
agg_double_fun = {'one': "sum(abs",
'infinity': "sum(abs",
'max': "max(abs"
}[norm_type]
val_expr = "{0}::float8".format(in_args['val'])
group_by_str = {'one': "GROUP BY " + in_args['col'],
'infinity': "GROUP BY " + in_args['row'],
'max': ""
}[norm_type]
else:
validate_dense(matrix_in, in_args)
agg_double_fun = ({'one': "{0}.array_max({0}.__matrix_column_abs_sum_agg",
'infinity': "{0}.array_sum({0}.array_abs",
'max': "{0}.array_max({0}.array_abs"
}[norm_type]
).format(schema_madlib)
val_expr = "{0}::float8[]".format(in_args['val'])
group_by_str = ""
norm = plpy.execute(
"""
SELECT max(v) as res
FROM (
-- output of inner query is 1 or more float8 values
SELECT
{agg_double_fun}({val_expr})) as v
FROM
{matrix_in}
{group_by_str}
) t
""".format(**locals()))
_assert(norm is not None and len(norm) == 1 and 'res' in norm[0],
"Matrix error: invalid result ")
return norm[0]['res']
# ------------------------------------------------------------------------------
def _matrix_spectral_norm(schema_madlib, matrix_in, in_args):
matrix_in_temp = None
matrix_in_view = None
with plpy.subtransaction():
if _is_sparse(matrix_in, in_args['val']):
validate_sparse(matrix_in, in_args)
matrix_in_temp = "pg_temp." + unique_string() + "_in"
matrix_densify(schema_madlib, matrix_in, in_args,
matrix_in_temp, 'row=row_id,val=row_vec')
matrix_in = matrix_in_temp
in_args = {'row': 'row_id', 'val': 'row_vec'}
else:
validate_dense(matrix_in, in_args)
matrix_in_view = "pg_temp." + unique_string() + "_view"
plpy.execute("""
CREATE VIEW {matrix_in_view} AS
(SELECT
{in_args[row]} as row_id,
{in_args[val]} as row_vec
FROM {matrix_in})
""".format(**locals()))
matrix_in = matrix_in_view
in_args = {'row': 'row_id', 'val': 'row_vec'}
matrix_prefix = "pg_temp." + unique_string()
matrix_s = matrix_prefix + "_s"
matrix_u = matrix_prefix + "_u"
matrix_v = matrix_prefix + "_v"
plpy.execute("""
SELECT
{schema_madlib}.svd('{matrix_in}', '{matrix_prefix}', '{in_args[row]}', 1)
""".format(**locals()))
res = plpy.execute("""
SELECT value from {matrix_s}
WHERE row_id = 1 AND col_id = 1 AND value is not NULL
""".format(**locals()))
_assert(len(res) == 1 and 'value' in res[0],
"Matrix error: Failed to calculate spectral norm")
spectral_norm = res[0]['value']
if matrix_in_temp is not None:
plpy.execute('DROP TABLE IF EXISTS ' + matrix_in_temp)
if matrix_in_view is not None:
plpy.execute('DROP VIEW IF EXISTS ' + matrix_in_view)
plpy.execute('DROP TABLE IF EXISTS ' + matrix_s)
plpy.execute('DROP TABLE IF EXISTS ' + matrix_u)
plpy.execute('DROP TABLE IF EXISTS ' + matrix_v)
return spectral_norm
# ------------------------------------------------------------------------------
def matrix_norm(schema_madlib, matrix_in, in_args, norm_type):
"""
Perform a matrix norm operation on a matrix
Args:
@param schema_madlib Name of the schema where MADlib is installed
@param matrix_a Name of the source table
@param norm_type Name of the norm computed
Returns:
@return norm value
Throws:
plpy.error if any argument is invalid
"""
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
norm_type = 'fro' if not norm_type else str(norm_type).lower()
# norm_type can be a prefix of any of the below supported types
supported_norm_types = ['one', 'spectral', 'infinity', 'max', 'frobenius']
try:
norm_type_val = next(x for x in supported_norm_types
if x.startswith(norm_type))
if norm_type_val == "frobenius":
# frobenius norm is same as element-wise 2-norm
return _matrix_elem_norm(schema_madlib, matrix_in, in_args, 2)
else:
return _matrix_induced_norm(schema_madlib, matrix_in, in_args, norm_type_val)
except StopIteration:
# next() returns a StopIteration if no element found
# If norm_type is not a supported string then it has be a positive float
try:
param = float(norm_type)
_assert(param > 0,
"Matrix error: Element-wise norm ({0}) should be a positive value."
.format(param))
return _matrix_elem_norm(schema_madlib, matrix_in, in_args, param)
except ValueError:
plpy.error("Matrix Error: Invalid norm type: {0}. "
"It can either be a positive float or (subset string) of "
"one of the supported norm types ({1})"
.format(norm_type, ', '.join(sorted(supported_norm_types))))
# ------------------------------------------------------------------------------
# -----------------------------------------------------------------------
# Decomposition operations
# -----------------------------------------------------------------------
def matrix_transform_helper(schema_madlib, matrix_in, in_args, matrix_out,
out_args, dim, agg_funcs_info):
fmt = None if 'fmt' not in out_args else out_args['fmt']
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
validate_sparse(matrix_in, in_args)
agg_str = """{schema_madlib}.{agg_func}(
{n_rows}::integer,
{n_cols}::integer,
({in_args[row]}-1)::integer,
({in_args[col]}-1)::integer,
({in_args[val]})::double precision
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0], n_cols=dim[1],
in_args=in_args, agg_func=agg_funcs_info['sparse_agg'])
else:
validate_dense(matrix_in, in_args)
agg_str = """{schema_madlib}.{agg_func}(
({n_rows})::integer,
({in_args[row]}-1)::integer,
({in_args[val]})::double precision[]
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args, agg_func=agg_funcs_info['dense_agg'])
is_output_sparse = fmt and fmt == 'sparse'
matrix_r = "pg_temp." + unique_string() if is_output_sparse else matrix_out
plpy.execute("""
CREATE TABLE {matrix_r} AS
SELECT row_id as {out_args[row]},
{schema_madlib}.get_row(res, row_id) as {out_args[val]}
FROM (
SELECT {agg_str} as res
FROM {matrix_in}
) q, generate_series(1, {dim[1]}) as row_id
""".format(**locals()))
if is_output_sparse:
matrix_sparsify(schema_madlib, matrix_r, out_args, matrix_out, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r)
# ------------------------------------------------------------------------------
def matrix_inverse(schema_madlib, matrix_in, in_args, matrix_out, out_args):
""" Compute the inverse of a matrix.
Inverse of a matrix is typically a dense matrix, including sparse input matrices.
Hence we always return a dense matrix result;
out_args['fmt'] = 'sparse' ensures a sparsified output.
"""
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
_assert(dim[0] == dim[1],
"Matrix error: Inverse operation is only defined for square matrices")
# FIXME: currently matrix inverse is computed on a single node after collating
# a distributed matrix. This places a limit on the maximum size of the matrix
# GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix inverse operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
agg_funcs_info = {'sparse_agg': '__matrix_sparse_inverse',
'dense_agg': '__matrix_dense_inverse'}
matrix_transform_helper(schema_madlib, matrix_in, in_args, matrix_out,
out_args, dim, agg_funcs_info)
# ------------------------------------------------------------------------------
def matrix_eigen(schema_madlib, matrix_in, in_args, matrix_out, out_args):
""" Compute the eigen values of a matrix.
The eigenvalues are repeated according to their algebraic multiplicity, so
there are as many eigenvalues as rows in the matrix.
"""
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
_assert(dim[0] == dim[1],
"Matrix error: Eigen value is only defined for square matrices")
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': 'eigen_values'})
# FIXME: currently matrix eigen is computed on a single node after collating
# a distributed matrix. This places a limit on the maximum size of the matrix
# GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix eigen operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
validate_sparse(matrix_in, in_args)
eigen_agg = """{schema_madlib}.__matrix_sparse_eigen(
{n_rows}::integer,
{n_rows}::integer,
({in_args[row]}-1)::integer,
({in_args[col]}-1)::integer,
({in_args[val]})::double precision
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args)
else:
validate_dense(matrix_in, in_args)
eigen_agg = """{schema_madlib}.__matrix_dense_eigen(
({n_rows})::integer,
({in_args[row]}-1)::integer,
({in_args[val]})::double precision[]
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args)
plpy.execute("""
CREATE TABLE {matrix_out} AS
SELECT row_id as {out_args[row]},
row(value[1], value[2])::{schema_madlib}.complex as {out_args[val]}
FROM (
SELECT row_id,
{schema_madlib}.get_row(res, row_id) as value
FROM (
SELECT {eigen_agg} as res
FROM {matrix_in}
) q, generate_series(1, {dim[0]}) as row_id ) m
""".format(**locals()))
# ------------------------------------------------------------------------------
def matrix_cholesky(schema_madlib, matrix_in, in_args,
matrix_out_prefix, out_args):
""" Compute the standard cholesky decomposition of a matrix.
Cholesky decomposition of a matrix is typically a dense matrix, including
sparse input matrices. Hence we always return a dense matrix result -
out_args['fmt'] = 'sparse' ensures a sparsified output.
Perform a robust Cholesky decomposition of a positive input matrix A, such
that A=P^tLDL*P, where P is a permutation matrix, L is lower triangular with
a unit diagonal and D is a diagonal matrix. C++ function returns
[P, L, D] together, and we retrieve them respectively. NOTE: Because it is
required that input matrix should be symmetric, it is only the lower
triangular part that will be used for the decompositon. The upper triangular
part won't be read.
"""
_validate_input_table(matrix_in)
_assert(matrix_out_prefix is not None and matrix_out_prefix.replace('"', '').strip() != '',
"Matrix error: Invalid output prefix ({0})".format(matrix_out_prefix))
matrix_p, matrix_l, matrix_d = (
add_postfix(matrix_out_prefix, i) for i in ("_p", "_l", "_d"))
for each_output in (matrix_p, matrix_l, matrix_d):
_validate_output_table(each_output)
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
_assert(dim[0] == dim[1],
"Matrix error: Cholesky decomposition operation is only defined for square matrices")
# FIXME: currently matrix cholesky is computed on a single node after collating
# a distributed matrix. This places a limit on the maximum size of the matrix
# GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix cholesky operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
out_fmt = None if 'fmt' not in out_args else out_args['fmt']
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
validate_sparse(matrix_in, in_args)
qr_agg = """{schema_madlib}.__matrix_sparse_cholesky(
{n_rows}::integer,
{n_rows}::integer,
({in_args[row]}-1)::integer,
({in_args[col]}-1)::integer,
({in_args[val]})::double precision
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args)
else:
validate_dense(matrix_in, in_args)
qr_agg = """{schema_madlib}.__matrix_dense_cholesky(
({n_rows})::integer,
({in_args[row]}-1)::integer,
({in_args[val]})::double precision[]
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args)
matrix_temp = "pg_temp." + unique_string()
plpy.execute("""
CREATE TABLE {matrix_temp} AS
SELECT row_id,
{schema_madlib}.get_row(res, row_id) as row_vec
FROM (
SELECT {qr_agg} as res
FROM {matrix_in}
) q, generate_series(1, {dim[0]}) as row_id
""".format(**locals()))
is_output_sparse = out_fmt and out_fmt == 'sparse'
matrix_p_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_p
plpy.execute("""
CREATE TABLE {matrix_p_temp} AS
SELECT row_id AS {out_args[row]},
row_vec[1:{dim[0]}] AS {out_args[val]}
FROM {matrix_temp}
""".format(**locals()))
if is_output_sparse:
matrix_sparsify(schema_madlib, matrix_p_temp, out_args, matrix_p, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_p_temp)
matrix_l_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_l
plpy.execute("""
CREATE TABLE {matrix_l_temp} AS
SELECT row_id AS {out_args[row]},
row_vec[{dim[0]} + 1 : {dim[0]} * 2] AS {out_args[val]}
FROM {matrix_temp}
""".format(**locals()))
if is_output_sparse:
matrix_sparsify(schema_madlib, matrix_l_temp, out_args, matrix_l, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_l_temp)
matrix_d_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_d
plpy.execute("""
CREATE TABLE {matrix_d_temp} AS
SELECT row_id AS {out_args[row]},
row_vec[{dim[0]} * 2 + 1: {dim[0]} * 3] AS {out_args[val]}
FROM {matrix_temp}
""".format(**locals()))
if is_output_sparse:
matrix_sparsify(schema_madlib, matrix_d_temp, out_args, matrix_d, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_d_temp)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_temp)
# ------------------------------------------------------------------------------
def matrix_qr(schema_madlib, matrix_in, in_args, matrix_out_prefix, out_args):
""" Compute the QR decomposition of a matrix.
QR decomposition of a matrix is typically a dense matrix, including sparse input matrices.
Hence we always return a dense matrix result - out_args['fmt'] = 'sparse'
ensures a sparsified output.
Performs a QR decomposition of a matrix A into matrices Q and R such that
A = QR. Cplusplus functions returns the result as: [Q, R].
"""
_validate_input_table(matrix_in)
_assert(matrix_out_prefix is not None and matrix_out_prefix.replace('"', '').strip() != '',
"Matrix error: Invalid output prefix ({0})".format(matrix_out_prefix))
matrix_q = add_postfix(matrix_out_prefix, "_q")
matrix_r = add_postfix(matrix_out_prefix, "_r")
_validate_output_table(matrix_q)
_validate_output_table(matrix_r)
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
# FIXME: currently decomposition is computed on a single node after collating
# a distributed matrix. This places a limit on the maximum size of the matrix
# GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix cholesky operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
out_fmt = None if 'fmt' not in out_args else out_args['fmt']
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
validate_sparse(matrix_in, in_args)
qr_agg = """{schema_madlib}.__matrix_sparse_qr(
{n_rows}::integer,
{n_cols}::integer,
({in_args[row]}-1)::integer,
({in_args[col]}-1)::integer,
({in_args[val]})::double precision
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
n_cols=dim[1], in_args=in_args)
else:
validate_dense(matrix_in, in_args)
qr_agg = """{schema_madlib}.__matrix_dense_qr(
({n_rows})::integer,
({in_args[row]}-1)::integer,
({in_args[val]})::double precision[]
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args)
matrix_temp = "pg_temp." + unique_string()
plpy.execute("""
CREATE TABLE {matrix_temp} AS
SELECT row_id,
{schema_madlib}.get_row(res, row_id) as row_vec
FROM (
SELECT {qr_agg} as res
FROM {matrix_in}
) q, generate_series(1, {dim[0]}) as row_id
""".format(**locals()))
is_output_sparse = out_fmt and out_fmt == 'sparse'
matrix_q_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_q
plpy.execute("""
CREATE TABLE {matrix_q_temp} AS
SELECT row_id as {out_args[row]},
row_vec[1:{dim[0]}] AS {out_args[val]}
FROM {matrix_temp}
""".format(**locals()))
if is_output_sparse:
matrix_sparsify(schema_madlib, matrix_q_temp, out_args, matrix_q, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_q_temp)
matrix_r_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_r
plpy.execute("""
CREATE TABLE {matrix_r_temp} AS
SELECT row_id as {out_args[row]},
row_vec[{dim[0]} + 1 : {dim[0]} + {dim[1]}] AS {out_args[val]}
FROM {matrix_temp}
""".format(**locals()))
if is_output_sparse:
matrix_sparsify(schema_madlib, matrix_r_temp, out_args, matrix_r, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r_temp)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_temp)
# ------------------------------------------------------------------------------
def matrix_eval_helper(schema_madlib, matrix_in, in_args, dim, eval_funcs_info):
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
validate_sparse(matrix_in, in_args)
eval_agg = """{schema_madlib}.{eval_func}(
{n_rows}::integer,
{n_rows}::integer,
({in_args[row]}-1)::integer,
({in_args[col]}-1)::integer,
({in_args[val]})::double precision
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args, eval_func=eval_funcs_info['sparse_eval'])
else:
validate_dense(matrix_in, in_args)
eval_agg = """{schema_madlib}.{eval_func}(
({n_rows})::integer,
({in_args[row]}-1)::integer,
({in_args[val]})::double precision[]
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args, eval_func=eval_funcs_info['dense_eval'])
r = plpy.execute("""
SELECT {eval_agg} as res FROM {matrix_in}
""".format(**locals()))
_assert(r and len(r) == 1,
"Matrix error: Invalid evaluation for matrix {0}".format(matrix_in))
return r[0]['res']
# ------------------------------------------------------------------------------
def matrix_rank(schema_madlib, matrix_in, in_args):
""" Compute the rank of a matrix.
Rank of a matrix is typically a dense matrix, including sparse input matrices.
Hence we always return a dense matrix result - out_args['fmt'] = 'sparse'
ensures a sparsified output.
"""
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
# FIXME: currently matrix rank is computed on a single node after collating
# a distributed matrix. This places a limit on the maximum size of the matrix
# GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix inverse operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
eval_funcs_info = {'sparse_eval': '__matrix_sparse_rank',
'dense_eval': '__matrix_dense_rank'}
return matrix_eval_helper(schema_madlib, matrix_in, in_args, dim, eval_funcs_info)
# ------------------------------------------------------------------------------
def matrix_lu(schema_madlib, matrix_in, in_args,
matrix_out_prefix, out_args):
""" Compute the full pivoting LU decomposition of a matrix.
Full pivoting LU decomposition of a matrix is typically a dense matrix,
including sparse input matrices.
Hence we always return a dense matrix result - out_args['fmt'] = 'sparse'
ensures a sparsified output.
Performs LU decomposition of a matrix with complete pivoting: the matrix A is
decomposed as A = P^-1LUQ^-1, where L is unit-lower-triangular, U is upper-triangular,
and P and Q are permutation matrices. C++ function returns the result as:
[P, L, U, Q]. Note that P, L, U, Q have different row number and column number if
A is not square matrix.
"""
_validate_input_table(matrix_in)
_assert(matrix_out_prefix is not None and matrix_out_prefix.replace('"', '').strip() != '',
"Matrix error: Invalid output prefix ({0})".format(matrix_out_prefix))
matrix_output_names = dict([(i, add_postfix(matrix_out_prefix, "_" + i))
for i in ("p", "l", "u", "q")])
for each_output in matrix_output_names.values():
_validate_output_table(each_output)
in_args = parse_matrix_args(in_args)
default_args = {'row': in_args['row'], 'col': in_args['col'], 'val': in_args['val']}
out_args = parse_matrix_args(out_args, in_default_args=default_args)
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0, "Matrix error: Invalid dimensions for input matrix")
# FIXME: currently matrix full pivoting decomposition is computed on a single node
# after collating a distributed matrix. This places a limit on the maximum size of
# the matrix GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix inverse operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
out_fmt = None if 'fmt' not in out_args else out_args['fmt']
is_input_sparse = _is_sparse(matrix_in, in_args['val'])
if is_input_sparse:
validate_sparse(matrix_in, in_args)
lu_agg = """{schema_madlib}.__matrix_sparse_lu(
{n_rows}::integer,
{n_rows}::integer,
({in_args[row]}-1)::integer,
({in_args[col]}-1)::integer,
({in_args[val]})::double precision
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args)
else:
validate_dense(matrix_in, in_args)
lu_agg = """{schema_madlib}.__matrix_dense_lu(
({n_rows})::integer,
({in_args[row]}-1)::integer,
({in_args[val]})::double precision[]
)
""".format(schema_madlib=schema_madlib, n_rows=dim[0],
in_args=in_args)
is_output_sparse = out_fmt and out_fmt == 'sparse'
matrix_temp = "pg_temp." + unique_string()
row_count = max(dim)
# number of rows in result is determined by the max(dim)
plpy.execute("""
CREATE TABLE {matrix_temp} AS
SELECT row_id,
{schema_madlib}.get_row(res, row_id) as row_vec
FROM (
SELECT {lu_agg} as res
FROM {matrix_in}
) q, generate_series(1, {row_count}) as row_id
""".format(**locals()))
matrix_temp_names = dict([
(k, 'pg_temp.' + unique_string() if is_output_sparse else v)
for k, v in matrix_output_names.items()])
plpy.execute("""
CREATE TABLE {matrix_temp_names[p]} AS
SELECT row_id AS {out_args[row]},
row_vec[1:{dim[0]}] AS {out_args[val]}
FROM {matrix_temp} WHERE row_id <= {dim[0]}
""".format(**locals()))
plpy.execute("""
CREATE TABLE {matrix_temp_names[q]} AS
SELECT row_id AS {out_args[row]},
row_vec[2 * {dim[0]} + {dim[1]} + 1:2 * {dim[0]} + 2 * {dim[1]}] AS {out_args[val]}
FROM {matrix_temp} WHERE row_id <= {dim[1]}
""".format(**locals()))
plpy.execute("""
CREATE TABLE {matrix_temp_names[l]} AS
SELECT row_id AS {out_args[row]},
row_vec[{dim[0]} + 1 : 2 * {dim[0]}] AS {out_args[val]}
FROM {matrix_temp} WHERE row_id <= {dim[0]}
""".format(**locals()))
plpy.execute("""
CREATE TABLE {matrix_temp_names[u]} AS
SELECT row_id AS {out_args[row]},
row_vec[2 * {dim[0]} + 1 : 2 * {dim[0]} + {dim[1]}] AS {out_args[val]}
FROM {matrix_temp} WHERE row_id <= {dim[0]}
""".format(**locals()))
if is_output_sparse:
for temp, output in zip(matrix_temp_names.values(),
matrix_output_names.values()):
matrix_sparsify(schema_madlib, temp, out_args, output, out_args)
plpy.execute('DROP TABLE IF EXISTS %s' % temp)
plpy.execute('DROP TABLE IF EXISTS %s' % matrix_temp)
# ------------------------------------------------------------------------------
def matrix_nuclear_norm(schema_madlib, matrix_in, in_args):
""" Compute the nuclear norm of a matrix.
Nuclear norm of a matrix is typically a dense matrix, including sparse input matrices.
Hence we always return a dense matrix result - out_args['fmt'] = 'sparse'
ensures a sparsified output.
"""
_validate_input_table(matrix_in)
in_args = parse_matrix_args(in_args)
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
# FIXME: currently matrix rank is computed on a single node after collating
# a distributed matrix. This places a limit on the maximum size of the matrix
# GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix inverse operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
eval_funcs_info = {'sparse_eval': '__matrix_sparse_nuclear_norm',
'dense_eval': '__matrix_dense_nuclear_norm'}
return matrix_eval_helper(schema_madlib, matrix_in, in_args, dim, eval_funcs_info)
# ------------------------------------------------------------------------------
def matrix_pinv(schema_madlib, matrix_in, in_args,
matrix_out, out_args):
""" Compute the generalized inverse of a matrix.
Generalized inverse of a matrix is typically a dense matrix, including sparse input matrices.
Hence we always return a dense matrix result - out_args['fmt'] = 'sparse'
ensures a sparsified output.
"""
_validate_input_table(matrix_in)
_validate_output_table(matrix_out)
in_args = parse_matrix_args(in_args)
out_args = parse_matrix_args(out_args,
in_default_args={'row': in_args['row'],
'col': in_args['col'],
'val': in_args['val']})
dim = get_dims(matrix_in, in_args)
_assert(min(dim) > 0,
"Matrix error: Invalid dimensions for input matrix")
# FIXME: currently matrix inverse is computed on a single node after collating
# a distributed matrix. This places a limit on the maximum size of the matrix
# GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100
max_size = 11100
_assert(dim[0] <= max_size,
"""Matrix error: Reached maximum limit for matrix inverse operation.
Maximum limit for matrix size is {0} x {0} """.format(max_size))
agg_funcs_info = {'sparse_agg': '__matrix_sparse_pinv',
'dense_agg': '__matrix_dense_pinv'}
matrix_transform_helper(schema_madlib, matrix_in, in_args, matrix_out,
out_args, dim, agg_funcs_info)
# ------------------------------------------------------------------------------
# -----------------------------------------------------------------------
# Creation operations
# -----------------------------------------------------------------------
def _matrix_diag_sparse(schema_madlib, diag_elements, matrix_out, out_args):
plpy.execute("""
CREATE TABLE {matrix_out}
({out_args[row]} INTEGER, {out_args[col]} INTEGER, {out_args[val]} FLOAT8)
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE) DISTRIBUTED BY ({out_args[row]})');
""".format(**locals()))
matrix_dim = len(diag_elements)
plan = plpy.prepare("""
INSERT INTO
{matrix_out}
SELECT
num as {out_args[row]},
num as {out_args[col]},
val as {out_args[val]}
FROM (
SELECT
unnest($1) as val,
generate_series(1, {matrix_dim}) as num
) l1
WHERE val <> 0.0 or num = {matrix_dim}
""".format(**locals()), ["FLOAT8[]"])
plpy.execute(plan, [diag_elements])
def _matrix_diag_dense(schema_madlib, diag_elements, matrix_out, out_args):
dim = len(diag_elements)
plan = plpy.prepare("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
row as {out_args[row]},
{schema_madlib}.__matrix_densify_agg({dim}, row, val) AS {out_args[val]}
FROM (
SELECT
unnest($1) AS val,
generate_series(1, {dim}) AS row
) l
GROUP BY l.row
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()), ["FLOAT8[]"])
plpy.execute(plan, [diag_elements])
def matrix_diag(schema_madlib, diag_elements, matrix_out, out_args):
""" Perform create a diagonal matrix with a specified vector on the main diagonal.
Args:
@param schema_madlib: str, Name of the schema containing madlib functions
@param diag_elements: array, Name of the vector containing diagonal elements.
Requires not NULL, not empty, not contains NULL
@param matrix_out: str, Name of the table to store result diagonal matrix
@param out_args: str, Name-value pair string containing options for matrix_out
Returns:
None
Side effect:
Creates an output table containing the result diagonal matrix
"""
_validate_input_array(diag_elements)
_validate_output_table(matrix_out)
out_args = parse_matrix_args(out_args, in_default_args={'row': 'row',
'col': 'col',
'val': 'val'})
if 'fmt' not in out_args or out_args['fmt'] == 'sparse':
_matrix_diag_sparse(schema_madlib, diag_elements, matrix_out, out_args)
else:
_matrix_diag_dense(schema_madlib, diag_elements, matrix_out, out_args)
# ------------------------------------------------------------------------------
def matrix_identity(schema_madlib, dim, matrix_out, out_args):
""" Perform create an identity matrix with the dimensionality specified by an integer.
Args:
@param schema_madlib: str, Name of the schema containing madlib functions
@param dim: integer, Name of a integer specifing the dimensinality.
Requires not NULL, dim > 0
@param matrix_out: str, Name of the table to store result identity matrix
@param out_args: str, Name-value pair string containing options for matrix_out
Returns:
None
Side effect:
Creates an output table containing the result identity matrix
"""
_assert(dim and dim > 0,
"Invalid input for dimensionality of matrix")
diag_elements = [1] * dim
matrix_diag(schema_madlib, diag_elements, matrix_out, out_args)
# ------------------------------------------------------------------------------
def _matrix_create_scalar(schema_madlib, row_dim, col_dim, matrix_out, out_args, scalar):
""" Create a row_dim by col_dim matrix every element of which is 1.
Args:
@param schema_madlib: str, Name of the schema containing madlib fuctions
@param row_dim: int, The number of rows of matrix
@param col_dim: int, The number of columns of matrix
@param matrix_out: str, Name of the table to store result matrix of ones
@param out_args: str, Name-value pair string containing options for matrix_out
@param scalar: float, Value of elements in matrix_out
Returns:
None
"""
_validate_output_table(matrix_out)
_assert(row_dim > 0 and col_dim > 0,
"Matrix error: Invalid dimensions for input matrices: {0} x {1}".
format(row_dim, col_dim))
_assert(scalar is not None,
"Matrix error: Invalid none value for elements.")
out_args = parse_matrix_args(
out_args, in_default_args={'row': 'row', 'col': 'col', 'val': 'val'})
if 'fmt' not in out_args or out_args['fmt'] == 'sparse':
if scalar != 0:
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
{out_args[row]},
{out_args[col]},
{scalar}::FLOAT8 as {out_args[val]}
FROM
generate_series(1, {row_dim}) {out_args[row]},
generate_series(1, {col_dim}) {out_args[col]}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
else:
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT {row_dim} as {out_args[row]},
{col_dim} as {out_args[col]},
0::FLOAT8 as {out_args[val]}
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
else:
plpy.execute("""
CREATE TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
row as {out_args[row]},
madlib.array_fill(madlib.array_of_float({col_dim}),
{scalar}::FLOAT8) as {out_args[val]}
FROM
generate_series(1, {row_dim}) as row
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(**locals()))
def matrix_zeros(schema_madlib, row_dim, col_dim, matrix_out, out_args):
""" Create a row_dim by col_dim matrix of zeros.
"""
_matrix_create_scalar(schema_madlib, row_dim, col_dim, matrix_out, out_args, 0)
def matrix_ones(schema_madlib, row_dim, col_dim, matrix_out, out_args):
""" Create a row_dim by col_dim matrix with each element set to 1.
"""
_matrix_create_scalar(schema_madlib, row_dim, col_dim, matrix_out, out_args, 1)
def matrix_random(schema_madlib, distribution, row_dim, col_dim,
in_args, matrix_out, out_args):
""" Generate a row_dim x col_dim random matrix sampled from given distribution
Args:
schema_madlib: MADlib schema name
distribution: Str, Name of the sampling distribution.
Supported names: uniform, normal, bernoulli
row_dim: int, Target matrix row dimensionality
col_dim: int, Target matrix col dimensionality
in_args: str, Distribution parameters in key=value pairs.
Supported parameters:
Normal: mu, sigma
Uniform: min, max
Bernoulli: lower, upper, prob
"""
_validate_output_table(matrix_out)
_assert(row_dim > 0 and col_dim > 0,
"Matrix error: Invalid dimensions for input matrices: {0} x {1}".
format(row_dim, col_dim))
out_args = parse_matrix_args(
out_args, in_default_args={'row': 'row', 'col': 'col', 'val': 'val'})
supported_dist = ['uniform', 'normal', 'bernoulli']
if distribution:
try:
distribution = next(x for x in supported_dist
if x.startswith(distribution))
except StopIteration:
# next() returns a StopIteration if no element found
plpy.error("Matrix Error: Invalid distribution: "
"{0}. Supported distributions: ({1})"
.format(distribution, ', '.join(sorted(supported_dist))))
else:
distribution = 'uniform'
in_args_default = {'seed': randint(0, 1000), 'temp_out': False}
in_args_types = {'seed': int, 'temp_out': bool}
if distribution == 'normal':
in_args_default.update({'mu': 0, 'sigma': 1})
in_args_types.update({'mu': float, 'sigma': float})
in_args_vals = extract_keyvalue_params(in_args, in_args_types, in_args_default)
random_vector = ("""{schema_madlib}.__normal_vector(
{col_dim},
{in_args_vals[mu]},
{in_args_vals[sigma]},
{in_args_vals[seed]} + row)
""".format(**locals()))
elif distribution == 'bernoulli':
in_args_default.update({'upper': 1., 'lower': 0., 'prob': 0.5})
in_args_types.update({'upper': float, 'lower': float, 'prob': float})
in_args_vals = extract_keyvalue_params(in_args, in_args_types, in_args_default)
random_vector = ("""{schema_madlib}.__bernoulli_vector(
{col_dim},
{in_args_vals[upper]},
{in_args_vals[lower]},
{in_args_vals[prob]},
{in_args_vals[seed]} + row)
""".format(**locals()))
elif distribution == 'uniform':
in_args_default.update({'min': 0, 'max': 1})
in_args_types.update({'min': float, 'max': float})
in_args_vals = extract_keyvalue_params(in_args, in_args_types, in_args_default)
random_vector = ("""{schema_madlib}.__uniform_vector(
{col_dim},
{in_args_vals[min]},
{in_args_vals[max]},
{in_args_vals[seed]} + row)
""".format(**locals()))
else:
plpy.error("Matrix Error: Invalid distribution: "
"{0}. Supported distributions: ({1})"
.format(distribution, ', '.join(sorted(supported_dist))))
plpy.execute("""
CREATE {temp_str} TABLE {matrix_out}
m4_ifdef(`__POSTGRESQL__', `',
`WITH (APPENDONLY=TRUE)') AS
SELECT
row as {out_args[row]},
{random_vector} as {out_args[val]}
FROM
generate_series(1, {row_dim}) as row
m4_ifdef(`__POSTGRESQL__', `',
`DISTRIBUTED BY ({out_args[row]})')
""".format(temp_str="TEMP" if in_args_vals['temp_out'] else "",
**locals()))