| """@file matrix_ops.py_in |
| |
| @namespace linalg |
| """ |
| |
| import plpy |
| import sys |
| from random import randint |
| |
| from utilities.utilities import __mad_version |
| from utilities.utilities import extract_keyvalue_params |
| from utilities.utilities import unique_string |
| from utilities.utilities import _assert |
| from utilities.utilities import add_postfix |
| from utilities.validate_args import get_cols |
| from utilities.validate_args import is_col_array |
| from utilities.validate_args import table_exists |
| from utilities.validate_args import table_is_empty |
| from utilities.validate_args import columns_exist_in_table |
| |
| version_wrapper = __mad_version() |
| string_to_array = version_wrapper.select_vecfunc() |
| array_to_string = version_wrapper.select_vec_return() |
| |
| |
| # ------------------------------------------------------------------------------ |
| # -- Utility functions --------------------------------------------------------- |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _matrix_column_to_array_format(source_table, row_id, output_table, |
| istemp=False): |
| """ |
| Convert a dense matrix in the column format into the array format |
| """ |
| _validate_output_table(output_table) |
| _validate_input_table(source_table) |
| row = plpy.execute(""" |
| SELECT nspname AS table_schema, relname AS table_name |
| FROM pg_class AS c, pg_namespace AS nsp |
| WHERE c.oid = '{source_table}'::regclass::oid AND |
| c.relnamespace = nsp.oid |
| """.format(source_table=source_table)) |
| table_schema = row[0]['table_schema'] |
| table_name = row[0]['table_name'] |
| |
| numeric_types = set(['smallint', 'integer', 'bigint', |
| 'real', 'numeric', 'double precision']) |
| all_columns = plpy.execute(""" |
| SELECT quote_ident(column_name) as column_name, data_type |
| FROM |
| information_schema.columns |
| WHERE |
| table_schema = '{table_schema}' AND |
| table_name = '{table_name}' |
| ORDER BY ordinal_position |
| """.format(table_schema=table_schema, |
| table_name=table_name)) |
| |
| all_col_names = [column['column_name'] for column in all_columns] |
| numeric_col_names = set(column['column_name'] for column in all_columns |
| if column['data_type'] in numeric_types) |
| |
| _assert(row_id in all_col_names, 'No row_id in the input table') |
| all_col_names.remove(row_id) |
| _assert(numeric_col_names.issuperset(set(all_col_names)), |
| "Not all columns are numeric!") |
| |
| plpy.execute(""" |
| CREATE {temp_str} TABLE {output_table} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {row_id} as row_id, |
| array[{val_col_names}]::double precision[] AS row_vec |
| FROM |
| {source_table} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY (row_id)') |
| """.format(output_table=output_table, row_id=row_id, |
| source_table=source_table, |
| temp_str=("", "TEMP")[istemp], |
| val_col_names=','.join(all_col_names))) |
| |
| |
| def create_temp_sparse_matrix_table_with_dims(source_table, |
| out_table, |
| row_id, col_id, value, |
| row_dim, col_dim, |
| sparse_where_condition=None): |
| """ |
| Make a copy of the input sparse table and add (row_dim, col_dim, NULL) to it |
| |
| This function is the specific call for sparse matrices and creates a temp |
| table which is a copy of the source table. After creation, the tuples |
| (row_dim, col_dim, NULL) is added to (row_id, col_id, value) |
| |
| Args: |
| @param source_table Source table (sparse matrix) |
| @param output_table Output table (sparse matrix) |
| @param row_dim Row dimensions |
| @param col_dim Column dimensions |
| Returns: |
| None |
| """ |
| if not sparse_where_condition: |
| sparse_where_condition = '' |
| plpy.execute(""" |
| CREATE TABLE {out_table} as |
| SELECT |
| {row_id}, |
| {col_id}, |
| {value} |
| FROM {source_table} |
| WHERE {value} is not NULL |
| {sparse_where_condition} |
| """.format(row_id=row_id, |
| col_id=col_id, |
| value=value, |
| source_table=source_table, |
| out_table=out_table, |
| sparse_where_condition=sparse_where_condition)) |
| res_row_dim, res_col_dim = get_dims(out_table, {'row': row_id, |
| 'col': col_id, |
| 'val': value}) |
| if res_row_dim != row_dim or res_col_dim != col_dim: |
| plpy.execute(""" |
| INSERT INTO {out_table} VALUES ({row_dim}, {col_dim}, 0) |
| """.format(**locals())) |
| |
| |
| def _is_sparse(matrix_in, val): |
| return not is_col_array(matrix_in, val) |
| |
| |
| def _matrix_default_args(): |
| return { |
| 'row': 'row_num', |
| 'col': 'col_num', |
| 'val': 'val', |
| 'trans': False |
| } |
| |
| |
| def parse_matrix_args(matrix_args, in_default_args=None): |
| """ Parse name-value pair string for a matrix |
| Args: |
| @param matrix_args: Can be either dict or string. |
| If dict, then it's returned as is |
| If str, then it's expected to contain key-value pairs |
| in format "key=value" |
| Supported keys: |
| row = Name of the column containing row id |
| col = Name of the column containing column id |
| val = Name of the column containing values |
| trans = Boolean indicating if a matrix should be transposed before operation |
| fmt = Format of the output table |
| Returns: |
| Dictionary: Parses the matrix_args and creates a dictionary containing |
| all the {names: value} |
| """ |
| default_args = _matrix_default_args() |
| if in_default_args: |
| default_args.update(in_default_args) |
| if not matrix_args: |
| return default_args |
| if isinstance(matrix_args, dict): |
| return matrix_args |
| if not isinstance(matrix_args, str): |
| raise ValueError("Matrix error: invalid input for matrix args") |
| params_types = {'row': str, 'col': str, 'val': str, 'trans': bool, 'fmt': str} |
| matrix_params = extract_keyvalue_params(matrix_args, |
| params_types, |
| default_args) |
| if 'fmt' in matrix_params and matrix_params['fmt']: |
| matrix_params['fmt'] = matrix_params['fmt'].lower() |
| _assert(matrix_params['fmt'] in ('sparse', 'dense'), |
| "Matrix error: invalid input for matrix args 'fmt', it should be 'dense'" |
| " or 'sparse' but provided input is '{0}'".format(matrix_params['fmt'])) |
| return matrix_params |
| |
| |
| def cast_dense_input_table_to_correct_columns(schema_madlib, matrix_in, |
| matrix_out, row_id): |
| # If the source table is already formatted as needed, do nothing, |
| # Otherwise, create a new temp table containing the data in the needed format (if possible). |
| # Returns true if a new table was generated. Returns false otherwise. |
| _validate_output_table(matrix_out) |
| _validate_input_table(matrix_in) |
| cols = get_cols(matrix_in, schema_madlib) |
| createTable = False |
| if len(cols) == 2: |
| cols.remove(row_id) |
| if not is_col_array(matrix_in, cols[0]): |
| plpy.error("SVD error: Data column should be of type array!") |
| if cols[0] != "row_vec" or row_id != "row_id": |
| plpy.execute( |
| """ |
| CREATE TABLE {matrix_out} as |
| SELECT {row_id} as row_id, {vec} as row_vec |
| FROM {matrix_in} |
| """.format(matrix_out=matrix_out, |
| row_id=row_id, vec=cols[0], matrix_in=matrix_in)) |
| createTable = True |
| else: |
| plpy.execute( |
| """ |
| SELECT {schema_madlib}.__matrix_column_to_array_format ( |
| '{matrix_in}', '{row_id}', '{matrix_out}', False) |
| """.format(schema_madlib=schema_madlib, matrix_in=matrix_in, |
| row_id=row_id, |
| matrix_out=matrix_out)) |
| createTable = True |
| return createTable |
| |
| |
| def matrix_ndims(schema_madlib, matrix_in, in_args, is_block): |
| _validate_input_table(matrix_in) |
| in_args = parse_matrix_args(in_args) |
| if is_block: |
| _validate_block(matrix_in, in_args) |
| elif _is_sparse(matrix_in, in_args['val']): |
| validate_sparse(matrix_in, in_args) |
| else: |
| validate_dense(matrix_in, in_args) |
| return get_dims(matrix_in, in_args, is_block) |
| |
| |
| def get_dims(matrix, matrix_args, is_block=False): |
| if is_block: |
| block_dim = plpy.execute(""" |
| SELECT |
| max({matrix_args[row]}) as max_row_id, |
| max({matrix_args[col]}) as max_col_id |
| FROM {matrix} |
| """.format(matrix=matrix, matrix_args=matrix_args))[0] |
| max_block_size = plpy.execute(""" |
| SELECT |
| (array_upper({matrix_args[val]}, 1) - |
| array_lower({matrix_args[val]}, 1) + 1) AS block_row_dim, |
| (array_upper({matrix_args[val]}, 2) - |
| array_lower({matrix_args[val]}, 2) + 1) AS block_col_dim |
| FROM {matrix} |
| WHERE {matrix_args[row]} = 1 AND {matrix_args[col]} = 1 |
| """.format(matrix=matrix, matrix_args=matrix_args))[0] |
| min_block_size = plpy.execute(""" |
| SELECT |
| (array_upper({matrix_args[val]}, 1) - |
| array_lower({matrix_args[val]}, 1) + 1) AS block_row_dim, |
| (array_upper({matrix_args[val]}, 2) - |
| array_lower({matrix_args[val]}, 2) + 1) AS block_col_dim |
| FROM {matrix} |
| WHERE {matrix_args[row]} = {max_row_id} and |
| {matrix_args[col]} = {max_col_id} |
| """.format(matrix=matrix, matrix_args=matrix_args, |
| max_row_id=block_dim['max_row_id'], |
| max_col_id=block_dim['max_col_id']))[0] |
| |
| # all blocks will be the maximum size except the last one |
| row_dim = (block_dim['max_row_id'] - 1) * max_block_size['block_row_dim'] + min_block_size['block_row_dim'] |
| col_dim = (block_dim['max_col_id'] - 1) * max_block_size['block_col_dim'] + min_block_size['block_col_dim'] |
| elif _is_sparse(matrix, matrix_args['val']): |
| cols = [matrix_args[i] for i in ('row', 'col', 'val')] |
| _assert(columns_exist_in_table(matrix, cols), |
| "Matrix error: At least one of the columns ({1}) missing " |
| "from matrix {0}".format(matrix, ','.join(cols))) |
| rv = plpy.execute(""" |
| SELECT |
| max({matrix_args[row]}::BIGINT) AS row_dim, |
| max({matrix_args[col]}::BIGINT) AS col_dim |
| FROM {matrix} |
| WHERE {matrix_args[val]} IS NOT NULL |
| """.format(matrix=matrix, matrix_args=matrix_args)) |
| row_dim = rv[0]['row_dim'] if rv[0]['row_dim'] is not None else 0 |
| col_dim = rv[0]['col_dim'] if rv[0]['col_dim'] is not None else 0 |
| else: |
| cols = [matrix_args[i] for i in ('row', 'val')] |
| _assert(columns_exist_in_table(matrix, cols), |
| "Matrix error: At least one of the columns ({1}) missing " |
| "from matrix {0}".format(matrix, ','.join(cols))) |
| rv = plpy.execute(""" |
| SELECT max({matrix_args[row]}::BIGINT) AS row_dim |
| FROM {matrix} |
| """.format(matrix=matrix, matrix_args=matrix_args)) |
| row_dim = rv[0]['row_dim'] if rv[0]['row_dim'] is not None else 0 |
| if row_dim != 0: |
| rv = plpy.execute(""" |
| SELECT |
| DISTINCT(array_upper({matrix_args[val]}::float8[], 1) - |
| array_lower({matrix_args[val]}::float8[], 1) + 1) |
| AS col_dim |
| FROM |
| {matrix} |
| """.format(matrix=matrix, matrix_args=matrix_args)) |
| _assert(rv and len(rv) == 1, |
| "dimensions mismatch: row_array.size() != vec.size(). " |
| "Data contains different sized arrays") |
| col_dim = rv[0]['col_dim'] |
| else: |
| col_dim = 0 |
| return (row_dim, col_dim) |
| |
| # ------------------------------------------------------------------------------ |
| # -- Validation functions ------------------------------------------------------ |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _validate_input_array(diag_elements): |
| _assert(diag_elements is not None, |
| "Matrix error: Invalid diag elements") |
| _assert(len(diag_elements) != 0, |
| "Matrix error: Empty diag elements") |
| |
| |
| def _validate_output_table(table): |
| _assert(table is not None and table.replace('"', '').strip() != '', |
| "Matrix error: Invalid output table ({0})".format(table)) |
| _assert(not table_exists(table), |
| "Matrix error: The output table ({0}) already exists".format(table)) |
| |
| |
| def _validate_input_table(table): |
| _assert(table is not None and table.replace('"', '').strip() != '', |
| "Matrix error: Invalid input table ({0})".format(table)) |
| _assert(table_exists(table), |
| "Matrix error: The input table ({0}) doesn't exist".format(table)) |
| |
| |
| def validate_matrix(matrix, matrix_args, check_col=True): |
| if _is_sparse(matrix, matrix_args['val']): |
| validate_sparse(matrix, matrix_args, check_col=check_col) |
| else: |
| validate_dense(matrix, matrix_args, check_col=check_col) |
| |
| |
| def validate_sparse(matrix, matrix_args, check_col=True): |
| _assert(table_exists(matrix), |
| "Matrix error: Could not find input table " + matrix) |
| _assert(not table_is_empty(matrix), |
| "Matrix error: The input table ({0}) is empty".format(matrix)) |
| if check_col: |
| _assert(columns_exist_in_table(matrix, |
| [matrix_args[i] for i in ('row', 'col', 'val')]), |
| "Matrix error: Missing columns from matrix {0}".format(matrix)) |
| _assert(not is_col_array(matrix, matrix_args['val']), |
| "Matrix error: invalid column ({0}) should not be an array " |
| "datatype".format(matrix_args['val'])) |
| |
| # verify that row and col entries are in the right range |
| row_dim, col_dim = get_dims(matrix, matrix_args) |
| invalid_rows = plpy.execute(""" |
| -- Check if only row indices in [1, row_dim] and |
| -- col_indices in [1, col_dim] are present |
| SELECT {matrix_args[row]} as r, {matrix_args[col]} as c |
| FROM |
| {matrix} |
| WHERE |
| {matrix_args[row]} < 1 OR |
| {matrix_args[col]} < 1 OR |
| {matrix_args[row]} > {row_dim} OR |
| {matrix_args[col]} > {col_dim} |
| ORDER BY r, c; |
| """.format(**locals())) |
| if invalid_rows: |
| plpy.error("Matrix error: Invalid index in matrix ({0}). " |
| "All row and column indices must be in the range [1, DIM]".format(matrix)) |
| |
| # verify that the matrix row_id, col_id elements are unique |
| c = unique_string() |
| non_unique_rows = plpy.execute(""" |
| SELECT {matrix_args[row]} as r, {matrix_args[col]} as c |
| FROM {matrix} |
| WHERE {matrix_args[val]} IS NOT NULL |
| GROUP BY {matrix_args[row]}, {matrix_args[col]} |
| HAVING count(*) > 1 |
| ORDER BY r, c |
| """.format(**locals())) |
| if non_unique_rows: |
| MAX_ROWS = 20 |
| bad_rows = [str(i['r']) + " | " + str(i['c']) for i in non_unique_rows[:MAX_ROWS]] |
| plpy.error( |
| """Matrix error: Following entries are duplicated in sparse matrix ({m}). |
| (Displaying a maximum of {M_R} entries) |
| {row} | {col} |
| -------+--------------------------- |
| {all_rows} |
| |
| Error: The above entries (only {M_R} shown) were duplicated in sparse matrix ({m}) |
| """.format(m=matrix, |
| row=matrix_args['row'], |
| col=matrix_args['col'], |
| all_rows="\n".join(bad_rows), |
| M_R=MAX_ROWS)) |
| |
| |
| def validate_dense(matrix, matrix_args, check_col=True, row_dim=None): |
| _assert(table_exists(matrix), "Matrix error: Could not find table {0}".format(matrix)) |
| if check_col: |
| cols = [matrix_args[i] for i in ('row', 'val')] |
| _assert(columns_exist_in_table( |
| matrix, cols), |
| "Matrix error: Missing columns (one of {0}) " |
| "from matrix {1}".format(','.join(cols), matrix)) |
| _assert(is_col_array(matrix, matrix_args['val']), |
| "Matrix error: invalid column ({0}) should be an array " |
| "datatype".format(matrix_args['val'])) |
| |
| # verify that the matrix row_id, col_id elements are unique and not missing |
| if row_dim is None: |
| row_dim = plpy.execute("SELECT max({0}::bigint) as max_row_dim FROM {1}". |
| format(matrix_args['row'], matrix))[0]['max_row_dim'] |
| c = unique_string() |
| non_unique_rows = plpy.execute(""" |
| -- Check duplicates |
| SELECT {matrix_args[row]}::BIGINT as r |
| FROM {matrix} |
| GROUP BY {matrix_args[row]} |
| HAVING (count(*) > 1) |
| |
| UNION |
| |
| -- Check if all (and only) indices in [1, row_dim] are present |
| SELECT coalesce({c}::BIGINT, {matrix_args[row]}::BIGINT) as r |
| FROM ( |
| SELECT {matrix_args[row]}::BIGINT, {c} |
| FROM |
| {matrix} |
| FULL OUTER JOIN |
| generate_series(1, {row_dim}) as {c} |
| ON ({matrix_args[row]}::BIGINT = {c}::bigint) |
| ) q |
| WHERE |
| {matrix_args[row]} is NULL OR |
| {c} is NULL |
| ORDER BY r; |
| """.format(**locals())) |
| if non_unique_rows: |
| MAX_ROWS = 20 |
| bad_rows = [str(i['r']) for i in non_unique_rows[:MAX_ROWS]] |
| plpy.error( |
| """Matrix error: Following {row} values are invalid in dense matrix ({m}). |
| |
| (Displaying a maximum of {M_R} entries) |
| {row} |
| ------------ |
| {all_rows} |
| |
| Error: Above {row} values are invalid in dense matrix ({m}). |
| """.format(m=matrix, |
| row=matrix_args['row'], |
| all_rows="\n".join(bad_rows), |
| M_R=MAX_ROWS)) |
| |
| |
| def _validate_block(matrix, matrix_args): |
| _assert(table_exists(matrix), "The input table {0} doesn't exist".format(matrix)) |
| _assert(columns_exist_in_table(matrix, |
| [matrix_args['row'], |
| matrix_args['col'], |
| matrix_args['val']]), |
| "Matrix error: One or more input columns ({0})" |
| " don't exist in the table ({1}) representing a block matrix". |
| format(','.join([matrix_args['row'], matrix_args['col'], |
| matrix_args['val']]), |
| matrix)) |
| _assert(is_col_array(matrix, matrix_args['val']), |
| "Matrix error: Invalid block column - array expected") |
| _assert(not table_is_empty(matrix), |
| "Matrix error: Input table {0} is empty".format(matrix)) |
| |
| # ------------------------------------------------------------------------------ |
| # -- Transformation operations ------------------------------------------------- |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_sparsify(schema_madlib, matrix_in, in_args, matrix_out, out_args): |
| |
| _ = schema_madlib # unused parameter - retained for consistent API |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| in_args = parse_matrix_args(in_args) |
| (row_dim, col_dim) = get_dims(matrix_in, in_args) |
| validate_dense(matrix_in, in_args, row_dim=row_dim) |
| |
| default_args = {'row': in_args['row'], 'val': in_args['val']} |
| if 'col' in in_args: |
| default_args['col'] = in_args['col'] |
| out_args = parse_matrix_args(out_args, in_default_args=default_args) |
| temp_col_id, temp_val = unique_string(), unique_string() |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {in_args[row]} as {out_args[row]}, |
| {temp_col_id} as {out_args[col]}, |
| {temp_val} as {out_args[val]} |
| FROM |
| ( |
| SELECT |
| {in_args[row]}, |
| unnest({in_args[val]}) AS {temp_val}, |
| generate_series(1, {col_dim}) AS {temp_col_id} |
| FROM |
| {matrix_in} |
| ) t1 |
| WHERE |
| {temp_val} <> 0 |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| res_row_dim, res_col_dim = get_dims(matrix_out, out_args) |
| if res_row_dim != row_dim or res_col_dim != col_dim: |
| plpy.execute(""" |
| INSERT INTO {matrix_out} VALUES ({row_dim}, {col_dim}, 0) |
| """.format(matrix_out=matrix_out, row_dim=row_dim, col_dim=col_dim)) |
| |
| |
| def matrix_densify(schema_madlib, matrix_in, in_args, matrix_out, out_args): |
| |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| in_args = parse_matrix_args(in_args) |
| validate_sparse(matrix_in, in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'val': in_args['val']}) |
| (row_dim, col_dim) = get_dims(matrix_in, in_args) |
| _assert(col_dim < sys.maxint, |
| "Matrix error: Matrix {0} has too many rows. This cannot be " |
| "transposed in a dense format due to " |
| "restrictions on maximum array size.".format(matrix_in)) |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {in_args[row]} AS {out_args[row]}, |
| {schema_madlib}.__matrix_densify_agg( |
| {col_dim}::integer, |
| {in_args[col]}::integer, |
| {in_args[val]}) AS {out_args[val]} |
| FROM |
| {matrix_in} |
| WHERE |
| {in_args[val]} IS NOT NULL |
| GROUP BY |
| {in_args[row]} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| |
| # Add vector with all zeros for rows not present in the sparse matrix |
| plpy.execute(""" |
| INSERT INTO {matrix_out} |
| SELECT row as {out_args[row]}, val as {out_args[val]} |
| FROM |
| ( |
| SELECT row |
| FROM generate_series(1, {row_dim}) AS row |
| WHERE row NOT IN |
| ( |
| SELECT {in_args[row]} as row |
| FROM {matrix_in} |
| GROUP BY {in_args[row]} |
| ) |
| ) t1, |
| ( |
| SELECT array_agg(val * 0) AS val |
| FROM generate_series(1, {col_dim}) AS val |
| ) t2 |
| """.format(**locals())) |
| |
| # ------------------------------------------------------------------------------ |
| # -- Element-wise operations --------------------------------------------------- |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_elem_op(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| elem_ops): |
| """ Perform element-wise operation on two matrices. |
| |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib functions |
| @param matrix_a: str, Name of the table containing 1st (left) matrix |
| @param a_args: str, Name-value pair string containing options for matrix_a |
| @param matrix_b: str, Name of the table containing 2nd (right) matrix |
| @param b_args: str, Name-value pair string containing options for matrix_b |
| @param matrix_out: str, Name of the table to store result matrix |
| @param out_args: str, Name-value pair string containing options for matrix_out |
| @param elem_ops: dict, Dictionary containing the operation to perform. |
| Requires two parameters: |
| scalar_op: The operation between each pair of |
| elements (Used for sparse matrices) |
| vector_op: The operation between each pair of |
| rows (Used for dense matrices) |
| |
| Returns: |
| None |
| Side effect: |
| Creates an output table containing the result matrix |
| """ |
| _validate_input_table(matrix_a) |
| _validate_input_table(matrix_b) |
| _validate_output_table(matrix_out) |
| |
| a_args = parse_matrix_args(a_args) |
| b_args = parse_matrix_args(b_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': a_args['row'], |
| 'col': a_args['col'], |
| 'val': a_args['val']}) |
| a_dim = get_dims(matrix_a, a_args) |
| b_dim = get_dims(matrix_b, b_args) |
| _assert(a_dim[0] == b_dim[0] and a_dim[1] == b_dim[1], |
| "Matrix error: The dimensions of the two matrices don't match") |
| fmt = out_args['fmt'] if 'fmt' in out_args else None |
| is_a_sparse = _is_sparse(matrix_a, a_args['val']) |
| is_b_sparse = _is_sparse(matrix_b, b_args['val']) |
| |
| transform_a_func, transform_b_func = None, None |
| if is_a_sparse == is_b_sparse: |
| # if both input formats are same, then directly apply operation ... |
| elem_op_func = _matrix_elem_op_sparse if is_a_sparse else _matrix_elem_op_dense |
| # ... and transform result if fmt is provided and not same format as input |
| to_transform_rst = fmt and ((fmt == "sparse") != is_a_sparse) |
| # transform_rst_func matters only if to_transform_rst == True |
| transform_rst_func = matrix_densify if is_a_sparse else matrix_sparsify |
| else: |
| # Different format inputs - change the format for one of the matrices. |
| # if output is sparse (default), change dense input to sparse |
| # if output is dense, change sparse input to dense |
| # In this case we don't need to transform output since it is directly |
| # produced in necessary format |
| to_transform_rst = False |
| is_output_sparse = not fmt or fmt == "sparse" |
| elem_op_func = _matrix_elem_op_sparse if is_output_sparse else _matrix_elem_op_dense |
| if is_output_sparse != is_a_sparse: |
| # output and matrix_a are opposite formats ... |
| transform_a_func = matrix_densify if is_a_sparse else matrix_sparsify |
| else: |
| # ... or if a is in the right format, then b is in the wrong format |
| transform_b_func = matrix_densify if is_b_sparse else matrix_sparsify |
| |
| matrix_out1 = "pg_temp." + unique_string() + "_a" |
| if not transform_a_func and not transform_b_func: |
| validate_matrix(matrix_a, a_args) |
| validate_matrix(matrix_b, b_args) |
| elif transform_a_func: |
| # transform function internally validates |
| transform_a_func(schema_madlib, matrix_a, a_args, matrix_out1, a_args) |
| matrix_a = matrix_out1 |
| else: |
| transform_b_func(schema_madlib, matrix_b, b_args, matrix_out1, b_args) |
| matrix_b = matrix_out1 |
| |
| matrix_out2 = "pg_temp." + unique_string() + "_r" |
| matrix_out_tmp = matrix_out2 if to_transform_rst else matrix_out |
| elem_op_func(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out_tmp, out_args, elem_ops) |
| if to_transform_rst: |
| transform_rst_func(schema_madlib, matrix_out_tmp, out_args, |
| matrix_out, out_args) |
| plpy.execute("DROP TABLE IF EXISTS " + matrix_out_tmp) |
| |
| |
| def _matrix_elem_op_sparse(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| elem_ops): |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {out_args[row]}, |
| {out_args[col]}, |
| {out_args[val]} |
| FROM ( |
| SELECT |
| coalesce(a.{a_args[row]}::bigint, b.{b_args[row]}::bigint) AS {out_args[row]}, |
| coalesce(a.{a_args[col]}::bigint, b.{b_args[col]}::bigint) AS {out_args[col]}, |
| coalesce(a.{a_args[val]}::float8, 0::float8) |
| {elem_ops[scalar_op]} |
| coalesce(b.{b_args[val]}::float8, 0::float8) AS {out_args[val]} |
| FROM |
| (SELECT * FROM {matrix_a} WHERE {a_args[val]} IS NOT NULL) AS a |
| FULL OUTER JOIN |
| (SELECT * FROM {matrix_b} WHERE {b_args[val]} IS NOT NULL) AS b |
| ON |
| a.{a_args[row]}::bigint = b.{b_args[row]}::bigint AND |
| a.{a_args[col]}::bigint = b.{b_args[col]}::bigint |
| ) t |
| WHERE |
| t.{out_args[val]}::float8 != 0 |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| a_dim = get_dims(matrix_a, a_args) |
| res_row_dim, res_col_dim = get_dims(matrix_out, out_args) |
| if res_row_dim != a_dim[0] or res_col_dim != a_dim[1]: |
| plpy.execute(""" |
| INSERT INTO {matrix_out} VALUES ({row_count}, {col_count}, 0) |
| """.format(matrix_out=matrix_out, |
| row_count=a_dim[0], |
| col_count=a_dim[1])) |
| |
| |
| def _matrix_elem_op_dense(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| elem_ops): |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| a.{a_args[row]} AS {out_args[row]}, |
| {schema_madlib}.{elem_ops[vector_op]}( |
| a.{a_args[val]}::float8[], |
| b.{b_args[val]}::float8[]) AS {out_args[val]} |
| FROM |
| {matrix_a} AS a, |
| {matrix_b} AS b |
| WHERE |
| a.{a_args[row]}::bigint = b.{b_args[row]}::bigint |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_add(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args): |
| elem_ops = {'scalar_op': '+', 'vector_op': 'array_add'} |
| matrix_elem_op(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| elem_ops) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_sub(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args): |
| elem_ops = {'scalar_op': '-', 'vector_op': 'array_sub'} |
| matrix_elem_op(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| elem_ops) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_extract_diag(schema_madlib, matrix_in, in_args): |
| """ Main function for main diagonal of matrix |
| |
| Args: |
| @param schema_madlib: Name of the MADlib schema |
| @param matrix_in: Str, Name of the matrix who need the main diagonal |
| @param in_args: Str, Key-value string providing column names for matrix_in |
| The string is of the format "key1=value1, key2=value2 ..." |
| This is parsed to obtain a dictionary. |
| Alternatively this argument can also be a dictionary |
| containing the key-value information (in which case no |
| parsing is performed) |
| Returns: |
| @param array_out: float[], Name of the main diagonal array |
| Side effect: |
| Creates an output table containing the transpose of matrix_in |
| """ |
| _validate_input_table(matrix_in) |
| in_args = parse_matrix_args(in_args) |
| if _is_sparse(matrix_in, in_args['val']): |
| validate_sparse(matrix_in, in_args) |
| return _matrix_extract_diag_sparse(schema_madlib, matrix_in, in_args) |
| else: |
| validate_dense(matrix_in, in_args) |
| return _matrix_extract_diag_dense(schema_madlib, matrix_in, in_args) |
| |
| |
| def _matrix_extract_diag_sparse(schema_madlib, matrix_in, in_args): |
| in_dim = get_dims(matrix_in, in_args) |
| out_num = min(in_dim[0], in_dim[1]) |
| res = plpy.execute(""" |
| SELECT |
| array_agg(val order by row) AS array_diag |
| FROM ( |
| SELECT coalesce(val,0) AS val, |
| num AS row |
| FROM ( |
| SELECT |
| {in_args[val]} AS val, |
| {in_args[row]}::BIGINT AS row |
| FROM |
| {matrix_in} |
| WHERE |
| {in_args[row]} = {in_args[col]} |
| ) AS t1 |
| RIGHT OUTER JOIN |
| generate_series(1, {out_num}) num |
| ON t1.row = num |
| ) t2 |
| """.format(**locals()))[0]['array_diag'] |
| return res |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _matrix_extract_diag_dense(schema_madlib, matrix_in, in_args): |
| in_dim = get_dims(matrix_in, in_args) |
| out_num = min(in_dim[0], in_dim[1]) |
| res = plpy.execute(""" |
| SELECT |
| array_agg(val order by row) AS array_diag |
| FROM ( |
| SELECT |
| {in_args[val]}[{in_args[row]}] AS val, |
| {in_args[row]}::BIGINT AS row |
| FROM |
| {matrix_in} |
| WHERE |
| {in_args[row]} <= {out_num} |
| ) AS t2 |
| WHERE val is not NULL |
| """.format(**locals()))[0]['array_diag'] |
| return res |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_elem_mult(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args): |
| elem_ops = {'scalar_op': '*', 'vector_op': 'array_mult'} |
| matrix_elem_op(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| elem_ops) |
| |
| # ------------------------------------------------------------------------------ |
| # -- Block operations ------------------------------------------------------ |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_block_mult(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, |
| matrix_out, out_args): |
| _validate_input_table(matrix_a) |
| _validate_input_table(matrix_b) |
| _validate_output_table(matrix_out) |
| |
| a_args = parse_matrix_args(a_args) |
| b_args = parse_matrix_args(b_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': a_args['row'], |
| 'col': a_args['col'], |
| 'val': a_args['val']}) |
| |
| _validate_block(matrix_a, a_args) |
| _validate_block(matrix_b, b_args) |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| row as {out_args[row]}, |
| col as {out_args[col]}, |
| {schema_madlib}.__matrix_mem_sum(block::float8[]) AS {out_args[val]} |
| FROM |
| ( |
| SELECT |
| a.{a_args[row]} AS row, |
| b.{b_args[col]} AS col, |
| {schema_madlib}.matrix_mem_mult(a.{a_args[val]}::float8[], |
| b.{a_args[val]}::float8[]) AS block |
| FROM |
| {matrix_a} AS a JOIN {matrix_b} AS b |
| ON (a.{a_args[col]}::bigint = b.{b_args[row]}::bigint) |
| ) t1 |
| GROUP BY row, col |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| |
| |
| def matrix_block_square(schema_madlib, matrix_in, in_args, |
| matrix_out, out_args): |
| matrix_block_mult(schema_madlib, matrix_in, in_args, matrix_in, in_args, |
| matrix_out, out_args) |
| |
| |
| def matrix_block_trans(schema_madlib, matrix_in, in_args, matrix_out, out_args): |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| _validate_input_table(matrix_in) |
| _validate_block(matrix_in, in_args) |
| _validate_output_table(matrix_out) |
| |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {in_args[row]} AS {out_args[col]}, |
| {in_args[col]} AS {out_args[row]}, |
| {schema_madlib}.matrix_mem_trans({in_args[val]}::float8[]) AS {out_args[val]} |
| FROM |
| {matrix_in} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[col]})') |
| """.format(**locals())) |
| |
| |
| def matrix_blockize(schema_madlib, matrix_in, in_args, |
| row_dim, col_dim, matrix_out, out_args): |
| _assert(row_dim > 0 and col_dim > 0, 'Matrix error: invalid block dimension') |
| _assert(row_dim * col_dim < sys.maxint, |
| "Matrix error: Block size requested ({0}) is " |
| "too large".format(row_dim * col_dim)) |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| |
| in_args = parse_matrix_args(in_args) |
| validate_dense(matrix_in, in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'val': in_args['val']}) |
| |
| rv = plpy.execute('SELECT count(*) AS total_row FROM ' + matrix_in) |
| total_row = rv[0]['total_row'] |
| residual = total_row % row_dim |
| border_row = (total_row / row_dim) * row_dim |
| |
| plpy.execute('DROP TABLE IF EXISTS ' + matrix_out) |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| brow_id AS {out_args[row]}, |
| bcol_id AS {out_args[col]}, |
| {schema_madlib}.__matrix_blockize_agg( |
| row_id::integer, |
| row_vec::float8[], |
| row_dim::integer) AS {out_args[val]} |
| FROM |
| ( |
| SELECT |
| {in_args[row]} as row_id, |
| (({in_args[row]}::bigint - 1) / {row_dim}::bigint) + 1 AS brow_id, |
| CASE WHEN {in_args[row]}::bigint <= {border_row} |
| THEN {row_dim} |
| ELSE {residual} |
| END AS row_dim, |
| {schema_madlib}.__matrix_row_split({in_args[val]}::float8[], |
| {col_dim}::integer) AS row_vec, |
| generate_series(1, |
| ceil((array_upper({in_args[val]}, 1) - |
| array_lower({in_args[val]}, 1) + 1 |
| )::FLOAT8 / {col_dim})::integer |
| ) as bcol_id |
| FROM |
| {matrix_in} |
| ) t1 |
| GROUP BY brow_id, bcol_id |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| |
| |
| def matrix_unblockize(schema_madlib, matrix_in, in_args, matrix_out, out_args): |
| |
| in_args = parse_matrix_args(in_args) |
| _validate_block(matrix_in, in_args) |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'val': in_args['val']}) |
| rv = plpy.execute(""" |
| SELECT |
| array_upper({in_args[val]}, 1) - array_lower({in_args[val]}, 1) + 1 AS row_dim, |
| array_upper({in_args[val]}, 2) - array_lower({in_args[val]}, 2) + 1 AS col_dim |
| FROM |
| {matrix_in} |
| WHERE |
| {in_args[row]} = 1 and {in_args[col]} = 1 |
| """.format(matrix_in=matrix_in, in_args=in_args)) |
| row_dim = rv[0]['row_dim'] |
| col_dim = rv[0]['col_dim'] |
| |
| rv = plpy.execute(""" |
| SELECT max({in_args[col]}::bigint) AS max_colid |
| FROM {matrix_in} |
| WHERE {in_args[row]}::bigint = 1 |
| """.format(matrix_in=matrix_in, in_args=in_args)) |
| max_colid = rv[0]['max_colid'] |
| |
| rv = plpy.execute(""" |
| SELECT |
| array_upper({in_args[val]}, 2) - |
| array_lower({in_args[val]}, 2) + 1 AS col_residual |
| FROM |
| {matrix_in} |
| WHERE |
| {in_args[row]}::bigint = 1 and {in_args[col]}::bigint = {max_colid} |
| """.format(matrix_in=matrix_in, in_args=in_args, max_colid=max_colid)) |
| col_residual = rv[0]['col_residual'] |
| |
| total_col_dim = (max_colid - 1) * col_dim + col_residual |
| |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| row as {out_args[row]}, |
| {schema_madlib}.__matrix_unblockize_agg( |
| {total_col_dim}::integer, |
| ((col-1) * {col_dim} + 1)::integer, |
| row_vec::float8[]) AS row_vec |
| FROM |
| ( |
| SELECT |
| ({in_args[row]}::bigint - 1) * {row_dim}::bigint + |
| generate_series(1, |
| array_upper({in_args[val]}::float8[], 1) - |
| array_lower({in_args[val]}::float8[], 1) + 1) AS row, |
| {in_args[col]} AS col, |
| {schema_madlib}.__matrix_unnest_block({in_args[val]}::float8[]) AS row_vec |
| FROM |
| {matrix_in} |
| ) t1 |
| GROUP BY |
| row |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| |
| # ------------------------------------------------------------------------------ |
| # -- Visitor operations -------------------------------------------------------- |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_extract(schema_madlib, matrix_in, in_args, extract_dim, index=1): |
| """ Extracts wanted row or column from matrix. |
| |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib functions |
| @param matrix_in: str, Name of the table containing matrix |
| @param in_args: str, Name-value pair string containing options for matrix_in |
| @param extract_dim: int, If 1 extract row. If 2, extract col |
| @param index: int, The index to be extracted, starting from 1 and default is 1. |
| |
| Returns: |
| Vector of elements of wanted column. |
| Side effect: |
| None |
| """ |
| _validate_input_table(matrix_in) |
| in_args = parse_matrix_args(in_args) |
| in_dim = get_dims(matrix_in, in_args) |
| |
| _assert(min(in_dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| _assert(index > 0, "Matrix error: The index ({0}) should be positive".format(index)) |
| _assert(in_dim[extract_dim - 1] >= index, |
| "Matrix error: The index ({0}) is invalid compared to " |
| "dimension size ({1})".format(index, in_dim[extract_dim - 1])) |
| |
| if _is_sparse(matrix_in, in_args['val']): |
| validate_sparse(matrix_in, in_args) |
| return _matrix_extract_sparse(schema_madlib, matrix_in, in_args, |
| extract_dim, in_dim, index) |
| else: |
| validate_dense(matrix_in, in_args, in_dim[0]) |
| if extract_dim == 1: |
| return _matrix_extract_row_dense(schema_madlib, matrix_in, in_args, index) |
| else: |
| return _matrix_extract_col_dense(schema_madlib, matrix_in, in_args, index) |
| |
| |
| def _matrix_extract_sparse(schema_madlib, matrix_in, in_args, |
| dim, dim_sizes, index=1): |
| # we define two variables: |
| # e_dim - the dimension to be extracted. |
| # If dim = 1, e_dim = row |
| # If dim = 2, e_dim = col |
| # o_dim - the orthogonal dimension to e_dim |
| e_dim = ('row', 'col')[dim - 1] |
| e_dim = in_args[e_dim] |
| o_dim = ('col', 'row')[dim - 1] |
| o_dim_size = dim_sizes[0 if o_dim == 'row' else 1] |
| o_dim = in_args[o_dim] |
| r = plpy.execute(""" |
| SELECT |
| array_agg(val order by {o_dim}) as res |
| FROM ( |
| SELECT |
| o_dim_values as {o_dim}, |
| coalesce(a.val::FLOAT8, 0::FLOAT8) AS val |
| FROM ( |
| SELECT {in_args[val]}::FLOAT8 as val, |
| {o_dim}::BIGINT as {o_dim} |
| FROM |
| {matrix_in} m |
| WHERE |
| {in_args[val]} IS NOT NULL AND |
| {e_dim}::BIGINT = {index}::BIGINT |
| ) AS a |
| RIGHT JOIN |
| generate_series(1, {o_dim_size}) o_dim_values |
| ON |
| a.{o_dim}::bigint = o_dim_values::bigint |
| ) t |
| """.format(**locals())) |
| |
| _assert(r and len(r) == 1, |
| "Matrix error: Invalid index ({0}) for matrix {1}".format(index, matrix_in)) |
| return r[0]['res'] |
| |
| |
| def _matrix_extract_row_dense(schema_madlib, matrix_in, in_args, index=1): |
| r = plpy.execute(""" |
| SELECT |
| m.{in_args[val]} AS res, |
| {schema_madlib}.assert(not {schema_madlib}.array_contains_null(m.{in_args[val]}), |
| 'Matrix error: Element in dense matrix should not be NULL') |
| FROM |
| {matrix_in} AS m |
| WHERE |
| m.{in_args[row]}::BIGINT = {index}::BIGINT |
| """.format(**locals())) |
| _assert(len(r) == 1, |
| "Matrix error: Invalid row index ({0}) for matrix {1}".format(index, matrix_in)) |
| return r[0]['res'] |
| |
| |
| def _matrix_extract_col_dense(schema_madlib, matrix_in, in_args, index=1): |
| r = plpy.execute(""" |
| SELECT |
| array_agg(val order by row) as res |
| FROM ( |
| SELECT |
| {in_args[row]}::BIGINT AS row, |
| {in_args[val]}[{index}] AS val, |
| {schema_madlib}.assert({in_args[val]}[{index}] is not NULL, |
| 'Matrix error: Element in dense matrix should not be NULL') |
| FROM |
| {matrix_in} |
| ) t |
| """.format(**locals())) |
| _assert(len(r) == 1, |
| "Matrix error: Invalid row index ({0}) for matrix {1}".format(index, matrix_in)) |
| return r[0]['res'] |
| |
| # ------------------------------------------------------------------------------ |
| # -- Extreme value operations -------------------------------------------------- |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_extremum(schema_madlib, matrix_in, in_args, dim, matrix_out, op, |
| fetch_index=False): |
| """ Extracts wanted extremum value(max/min) from matrix. |
| |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib functions |
| @param matrix_in: str, Name of the table containing matrix |
| @param in_args: str, Name-value pair string containing options for matrix_in |
| @param dim: int, Along which dimension you want to get extremum value. |
| dim = 1 implies compute extremum for each column |
| dim = 2 implies compute extremum for each row |
| @param matrix_out: str, Result matrix table with a vector column storing extremum values. |
| If fetch_index is True, contains another index column for |
| first occurrence indices of the extremum value. |
| @param op: str, Can be either 'max' or 'min' |
| @param fetch_index: bool, If true, fetch first occurrence indices of extremum value. |
| |
| Returns: |
| None |
| Side effect: |
| Creates an output table containing the result max/min values as well as |
| their first occurrence indices if wanted. |
| |
| """ |
| _validate_input_table(matrix_in) |
| |
| in_args = parse_matrix_args(in_args) |
| in_dim = get_dims(matrix_in, in_args) |
| |
| op = op.lower() |
| _assert(min(in_dim) > 0, "Matrix error: Invalid dimensions for input matrix") |
| _assert(dim == 1 or dim == 2, |
| "Matrix error: Invalid dimensionality: {0}. It should be either 1 or 2".format(dim)) |
| |
| if _is_sparse(matrix_in, in_args['val']): |
| # matrix_in is sparse |
| validate_sparse(matrix_in, in_args) |
| _matrix_extremum_sparse(schema_madlib, matrix_in, in_args, |
| matrix_out, fetch_index, |
| dim, in_dim, op) |
| else: |
| validate_dense(matrix_in, in_args, in_dim[0]) |
| if dim == 2: |
| _matrix_row_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out, |
| fetch_index, op) |
| else: |
| _matrix_col_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out, |
| fetch_index, in_dim[1], op) |
| |
| |
| def _matrix_extremum_sparse(schema_madlib, matrix_in, in_args, matrix_out, |
| fetch_index, dim, dim_sizes, op): |
| """ |
| Args: |
| @param dim: Dimension identifier - can only be 1 or 2 |
| 1 = compute extremum for each column (i.e. aggregate over rows) |
| 2 = compute extremum for each row (i.e. aggregate over columns) |
| @param dim_sizes: The number of elements in each dimension (size must be 2) |
| |
| Returns: |
| """ |
| |
| # we define two variables: |
| # a_dim = aggregated dimension, this is the dimension across which we compute |
| # the extremum. This dimension is aggregated and the number of elements |
| # in the result along this dimension will be 1 |
| # o_dim = orthogonal dimension, the dimension orthogonal to a_dim |
| # |
| # When dim = 1, we aggregate across rows (a_dim = row) to get an extremum value for each column |
| # When dim = 2, we aggregate across cols (a_dim = col) to get an extremum value for each row |
| a_dim = ('row', 'col')[dim - 1] |
| a_dim = in_args[a_dim] |
| o_dim = ('col', 'row')[dim - 1] # the other dimension |
| o_dim_size = dim_sizes[0 if o_dim == 'row' else 1] |
| a_dim_size = dim_sizes[1 if o_dim == 'row' else 0] |
| o_dim = in_args[o_dim] |
| |
| agg_index = '' |
| if fetch_index: |
| agg_index = "array_agg({a_dim} order by {o_dim}) as index,".format(**locals()) |
| |
| if op == 'min': |
| extremum_name = 'min' |
| vector_func = 'array_min_index' |
| else: |
| extremum_name = 'max' |
| vector_func = 'array_max_index' |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {agg_index} |
| array_agg({in_args[val]} order by {o_dim}) as {extremum_name} |
| FROM ( |
| SELECT |
| {o_dim}, |
| coalesce({in_args[val]}[2]::BIGINT, 0) as {a_dim}, |
| coalesce({in_args[val]}[1], 0) as {in_args[val]} |
| FROM ( |
| SELECT |
| {o_dim}::BIGINT as {o_dim}, |
| {schema_madlib}.{vector_func}( |
| {schema_madlib}.__matrix_densify_agg( |
| {a_dim_size}::INTEGER, |
| {a_dim}::INTEGER, |
| {in_args[val]}::FLOAT8)) |
| AS {in_args[val]} |
| FROM |
| {matrix_in} |
| WHERE |
| {in_args[val]} IS NOT NULL |
| GROUP BY |
| {o_dim}) i |
| RIGHT OUTER JOIN |
| generate_series(1, {o_dim_size}) b |
| ON |
| i.{o_dim} = b) m |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({extremum_name})') |
| """.format(**locals())) |
| |
| |
| def _matrix_col_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out, |
| fetch_index, col_count, op): |
| agg_index = '' |
| if fetch_index: |
| agg_index = "array_agg(row order by col) as index,".format(**locals()) |
| |
| if op == 'min': |
| extremum_name = 'min' |
| sort_order = 'asc' |
| else: |
| extremum_name = 'max' |
| sort_order = 'desc' |
| |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {agg_index} |
| array_agg(val order by col) as {extremum_name} |
| FROM |
| (SELECT |
| row, |
| col, |
| val, |
| row_number() over (partition by col order by val {sort_order}, row) as rn |
| FROM |
| (SELECT |
| unnest({in_args[val]}) as val, |
| {in_args[row]}::BIGINT as row, |
| generate_series(1, {col_count}) as col |
| FROM |
| {matrix_in}) m) t |
| WHERE |
| rn = 1 |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({extremum_name})') |
| """.format(**locals())) |
| |
| |
| def _matrix_row_extremum_dense(schema_madlib, matrix_in, in_args, matrix_out, fetch_index, op): |
| agg_index = '' |
| if fetch_index: |
| agg_index = 'array_agg(value_index[2] order by row) as index,' |
| |
| extremum_name = 'max' |
| vector_func_name = 'array_argmax' |
| if op == 'min': |
| extremum_name = 'min' |
| vector_func_name = 'array_min_index' |
| else: |
| extremum_name = 'max' |
| vector_func_name = 'array_max_index' |
| |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {agg_index} |
| array_agg(value_index[1] order by row) as {extremum_name} |
| FROM |
| (SELECT |
| {in_args[row]}::BIGINT as row, |
| {schema_madlib}.{vector_func_name}({in_args[val]}) as value_index |
| FROM |
| {matrix_in}) m |
| WHERE |
| value_index[2] <> 0 |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({extremum_name})') |
| """.format(**locals())) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_max(schema_madlib, matrix_in, in_args, dim, matrix_out, |
| fetch_index=False): |
| matrix_extremum(schema_madlib, matrix_in, in_args, dim, |
| matrix_out, 'max', fetch_index) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_min(schema_madlib, matrix_in, in_args, dim, matrix_out, |
| fetch_index=False): |
| matrix_extremum(schema_madlib, matrix_in, in_args, dim, |
| matrix_out, 'min', fetch_index) |
| |
| # ------------------------------------------------------------------------------ |
| # -- Reduction operations ------------------------------------------------------ |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _matrix_reduction_op_sparse(schema_madlib, matrix_in, in_args, |
| dim, in_dims, expr): |
| # When dim = 1, we aggregate across rows (a_dim = row) to get an reduction for each column |
| # When dim = 2, we aggregate across cols (a_dim = col) to get an reduction for each row |
| |
| # we define three variables: |
| # a_dim = aggregated dimension, this is the dimension across which we compute |
| # the reduction. This dimension is aggregated and the number of elements |
| # in the result along this dimension will be 1 |
| # n_a_entries = Number of elements along a_dim |
| # normalizer = Number of elements along the dimension orthogonal to a_dim. |
| # This is used as a normalizer in reduction operations that |
| # compute central tendency (averages) |
| if dim == 1: |
| a_dim = in_args['col'] |
| n_entries = in_dims[1] |
| normalizer = in_dims[0] |
| else: |
| a_dim = in_args['row'] |
| n_entries = in_dims[0] |
| normalizer = in_dims[1] |
| |
| expr = expr.format(**locals()) |
| r = plpy.execute(""" |
| SELECT |
| array_agg(op_val order by {a_dim}) as res |
| FROM ( |
| SELECT |
| {a_dim}, |
| {expr} as op_val |
| FROM |
| (SELECT |
| b as {a_dim}, |
| coalesce(m.{in_args[val]}::FLOAT8, 0::FLOAT8) as value |
| FROM |
| {matrix_in} m |
| RIGHT OUTER JOIN |
| generate_series(1, {n_entries}) b |
| ON m.{a_dim}::bigint = b::bigint) t |
| GROUP BY {a_dim} |
| ) op_t |
| """.format(**locals())) |
| _assert(len(r) == 1, |
| "Matrix error: Bad matrix dimension, " |
| "row:{row}, column:{col}".format(row=in_dims[0], col=in_dims[1])) |
| return r[0]['res'] |
| |
| |
| def _matrix_row_reduction_op_dense(schema_madlib, matrix_in, in_args, |
| vector_op): |
| r = plpy.execute(""" |
| SELECT |
| array_agg( {schema_madlib}.{vector_op}( {in_args[val]}::FLOAT8[] ) |
| order by {in_args[row]}::BIGINT ) as res |
| FROM |
| {matrix_in} m |
| """.format(**locals())) |
| |
| _assert(len(r) == 1, |
| "Matrix error: Invalid values in value column, " + in_args['val']) |
| return r[0]['res'] |
| |
| |
| def _matrix_col_reduction_op_dense(schema_madlib, matrix_in, in_args, |
| col_count, vector_op): |
| r = plpy.execute(""" |
| SELECT |
| array_agg(val order by col) as res |
| FROM ( |
| SELECT |
| col, |
| {vector_op}(val::float8) as val |
| FROM |
| (SELECT |
| unnest({in_args[val]}) as val, |
| {in_args[row]}::BIGINT as row, |
| generate_series(1, {col_count}) as col |
| FROM |
| {matrix_in}) m |
| GROUP BY |
| col |
| ) t |
| """.format(**locals())) |
| _assert(len(r) == 1, |
| "Matrix error: Invalid values in value column, " + in_args['val']) |
| return r[0]['res'] |
| |
| |
| def matrix_reduction_op(schema_madlib, matrix_in, in_args, dim, vector_ops): |
| _validate_input_table(matrix_in) |
| |
| in_args = parse_matrix_args(in_args) |
| dimensions = get_dims(matrix_in, in_args) |
| _assert(min(dimensions) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| _assert(dim == 1 or dim == 2, |
| "Matrix error: Invalid value for dimension ({0}). " |
| "The dimension should be 1 for column and 2 for row".format(dim)) |
| |
| if _is_sparse(matrix_in, in_args['val']): |
| validate_sparse(matrix_in, in_args) |
| return _matrix_reduction_op_sparse(schema_madlib, matrix_in, |
| in_args, dim, dimensions, |
| vector_ops['sparse_agg_expr']) |
| else: |
| validate_dense(matrix_in, in_args, dimensions[0]) |
| if dim == 1: |
| return _matrix_col_reduction_op_dense(schema_madlib, matrix_in, |
| in_args, dimensions[1], |
| vector_ops['dense_agg_op']) |
| else: |
| return _matrix_row_reduction_op_dense(schema_madlib, matrix_in, |
| in_args, |
| vector_ops['dense_array_op']) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_sum(schema_madlib, matrix_in, in_args, dim): |
| """ Calculate sum along the dimension of matrix. |
| |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib functions |
| @param matrix_in: str, Name of the table containing matrix |
| @param in_args: str, Name-value pair string containing options for matrix_in |
| @param dim: int, Dimension, 1 for column and 2 for row. |
| |
| Returns: |
| Vector of orderred sum value along dimension. |
| Side effect: |
| None |
| """ |
| vector_ops = {'sparse_agg_expr': 'sum(value)', |
| 'dense_agg_op': 'sum', |
| 'dense_array_op': 'array_sum_big'} |
| return matrix_reduction_op(schema_madlib, matrix_in, in_args, dim, vector_ops) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_mean(schema_madlib, matrix_in, in_args, dim): |
| """ Calculate mean along the dimension of matrix. |
| |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib functions |
| @param matrix_in: str, Name of the table containing matrix |
| @param in_args: str, Name-value pair string containing options for matrix_in |
| @param dim: int, Dimension, 1 for column and 2 for row. |
| |
| Returns: |
| Vector of orderred mean value along dimension. |
| Side effect: |
| None |
| """ |
| vector_ops = {'sparse_agg_expr': 'sum(value)::float8/{normalizer}', |
| 'dense_agg_op': 'avg', |
| 'dense_array_op': 'array_mean'} |
| return matrix_reduction_op(schema_madlib, matrix_in, in_args, dim, vector_ops) |
| |
| # ------------------------------------------------------------------------------ |
| # -- Mathematical operations ------------------------------------------------------ |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_trans(schema_madlib, matrix_in, in_args, |
| matrix_out, out_args): |
| """ Main function for transpose of matrix |
| |
| Args: |
| @param schema_madlib: Name of the MADlib schema |
| @param matrix_in: Str, Name of the matrix to be transposed |
| @param in_args: Str, Key-value string providing column names for matrix_in |
| The string is of the format "key1=value1, key2=value2 ..." |
| This is parsed to obtain a dictionary. |
| Alternatively this argument can also be a dictionary |
| containing the key-value information (in which case no |
| parsing is performed) |
| @param matrix_out: Str, Name of the matrix to contain transposed result |
| @param out_args: Str, Key-value string providing column names for |
| matrix_out. Like in_args, this can also be dictionary. |
| Returns: |
| None |
| Side effect: |
| Creates an output table containing the transpose of matrix_in |
| """ |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| fmt = out_args['fmt'] if 'fmt' in out_args else None |
| |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| validate_func = validate_sparse if is_input_sparse else validate_dense |
| trans_func = _matrix_trans_sparse if is_input_sparse else _matrix_trans_dense |
| change_func = matrix_densify if is_input_sparse else matrix_sparsify |
| change_fmt = fmt and fmt == ('sparse', 'dense')[is_input_sparse] |
| |
| validate_func(matrix_in, in_args) |
| matrix_r = "pg_temp." + unique_string() + "_out1" if change_fmt else matrix_out |
| trans_func(schema_madlib, matrix_in, in_args, matrix_r, out_args) |
| if change_fmt: |
| change_func(schema_madlib, matrix_r, out_args, matrix_out, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r) |
| |
| |
| def _matrix_trans_sparse(schema_madlib, matrix_in, in_args, |
| matrix_out, out_args): |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {in_args[row]} AS {out_args[col]}, |
| {in_args[col]} AS {out_args[row]}, |
| {in_args[val]} AS {out_args[val]} |
| FROM |
| {matrix_in} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[col]})') |
| """.format(matrix_in=matrix_in, |
| in_args=in_args, |
| matrix_out=matrix_out, |
| out_args=out_args)) |
| |
| |
| def _matrix_trans_dense(schema_madlib, matrix_in, in_args, |
| matrix_out, out_args): |
| (row_dim, col_dim) = get_dims(matrix_in, in_args) |
| _assert(row_dim < sys.maxint, |
| "Matrix error: Matrix {0} has too many rows. " |
| "This cannot be transposed in a dense format due to " |
| "restrictions on maximum array size.".format(matrix_in)) |
| temp_row, temp_col, temp_val = (unique_string() for i in range(3)) |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {temp_col} AS {out_args[row]}, |
| {schema_madlib}.__matrix_densify_agg( |
| {row_dim}::integer, |
| {temp_row}::integer, |
| {temp_val}) AS {out_args[val]} |
| FROM |
| ( SELECT {in_args[row]} as {temp_row}, |
| unnest({in_args[val]}) AS {temp_val}, |
| generate_series(1, {col_dim}) AS {temp_col} |
| FROM |
| {matrix_in} |
| ) t1 |
| GROUP BY |
| {temp_col} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_mult(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args=None): |
| |
| _validate_input_table(matrix_a) |
| _validate_input_table(matrix_b) |
| _validate_output_table(matrix_out) |
| |
| a_args = parse_matrix_args(a_args) |
| b_args = parse_matrix_args(b_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': a_args['row'], |
| 'col': a_args['col'], |
| 'val': a_args['val']}) |
| to_drop = [] |
| |
| if a_args['trans']: |
| matrix_trans_a = "pg_temp." + unique_string() + "_trans_a" |
| matrix_trans(schema_madlib, matrix_a, a_args, matrix_trans_a, None) |
| matrix_a = matrix_trans_a |
| to_drop.append(matrix_trans_a) |
| # we defer the transpose operation for matrix_b since dense multiplication will |
| # require to transpose B. We should avoid double transpose. |
| |
| a_dim = get_dims(matrix_a, a_args) |
| b_dim = get_dims(matrix_b, b_args) |
| _assert(min(a_dim) > 0 and min(b_dim) > 0, |
| "Matrix error: Invalid dimensions for input matrices") |
| left_dim = a_dim[1] |
| right_dim = b_dim[0] if not b_args['trans'] else b_dim[1] |
| _assert(left_dim == right_dim, |
| "Matrix error: Dimension mismatch for matrix multiplication.\n" |
| "Left matrix, col dimension = {0}, " |
| "Right matrix, row dimension = {1}".format(left_dim, right_dim)) |
| |
| fmt = out_args['fmt'] if 'fmt' in out_args else None |
| # both matrix_a and matrix_b are dense |
| if (not _is_sparse(matrix_a, a_args['val']) and |
| not _is_sparse(matrix_b, b_args['val'])): |
| validate_dense(matrix_a, a_args, row_dim=a_dim[0]) |
| validate_dense(matrix_b, b_args, row_dim=b_dim[0]) |
| if (fmt is None or fmt == 'dense'): |
| _matrix_mult_dense(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| is_b_transposed=b_args['trans']) |
| else: |
| matrix_temp = "pg_temp." + unique_string() + "_temp" |
| _matrix_mult_dense(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_temp, _matrix_default_args(), |
| is_b_transposed=b_args['trans']) |
| matrix_sparsify(schema_madlib, matrix_temp, _matrix_default_args(), matrix_out, out_args) |
| to_drop.append(matrix_temp) |
| else: |
| # sparsify matrix_a if not sparse |
| if not _is_sparse(matrix_a, a_args['val']): |
| matrix_temp_a = "pg_temp." + unique_string() + "_temp_a" |
| matrix_sparsify(schema_madlib, matrix_a, a_args, |
| matrix_temp_a, _matrix_default_args()) |
| matrix_a = matrix_temp_a |
| a_args = _matrix_default_args() |
| to_drop.append(matrix_a) |
| else: |
| validate_sparse(matrix_a, a_args) |
| |
| # sparsify matrix_b if not sparse |
| if not _is_sparse(matrix_b, b_args['val']): |
| matrix_temp_b = "pg_temp." + unique_string() + "_temp_b" |
| matrix_sparsify(schema_madlib, matrix_b, b_args, matrix_temp_b, None) |
| matrix_b = matrix_temp_b |
| to_drop.append(matrix_b) |
| if 'col' not in b_args: |
| b_args['col'] = _matrix_default_args()['col'] |
| else: |
| validate_sparse(matrix_b, b_args) |
| |
| # use transpose of 'b' if specified by user |
| if b_args['trans']: |
| matrix_trans_b = "pg_temp." + unique_string() + "_trans_b" |
| matrix_trans(schema_madlib, matrix_b, b_args, matrix_trans_b, None) |
| matrix_b = matrix_trans_b |
| to_drop.append(matrix_trans_b) |
| |
| is_output_sparse = fmt and fmt == 'sparse' |
| matrix_result = matrix_out if is_output_sparse else "pg_temp." + unique_string() + "_r" |
| _matrix_mult_sparse(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, |
| matrix_result, _matrix_default_args()) |
| if not is_output_sparse: |
| matrix_densify(schema_madlib, |
| matrix_result, _matrix_default_args(), |
| matrix_out, out_args) |
| to_drop.append(matrix_result) |
| |
| for each_table in to_drop: |
| plpy.execute('DROP TABLE IF EXISTS %s' % each_table) |
| |
| |
| def _matrix_mult_sparse(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args): |
| |
| a_dim = get_dims(matrix_a, a_args) |
| b_dim = get_dims(matrix_b, b_args) |
| temp_row, temp_col, temp_val = (unique_string() for i in range(3)) |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {temp_row} as {out_args[row]}, |
| {temp_col} as {out_args[col]}, |
| sum({temp_val}) AS {out_args[val]} |
| FROM |
| ( |
| SELECT |
| a.{a_args[row]} AS {temp_row}, |
| b.{b_args[col]} AS {temp_col}, |
| a.{a_args[val]}::float8 * b.{b_args[val]}::float8 AS {temp_val} |
| FROM |
| {matrix_a} AS a, |
| {matrix_b} AS b |
| WHERE |
| a.{a_args[val]} IS NOT NULL AND a.{a_args[val]} != 0 AND |
| b.{b_args[val]} IS NOT NULL AND b.{b_args[val]} != 0 AND |
| a.{a_args[col]}::bigint = b.{b_args[row]}::bigint |
| ) t |
| GROUP BY |
| {temp_row}, {temp_col} |
| m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| |
| # if the last row or column is all zeros then we need to add a redundant |
| # entry to ensure the matrix dimensions can be interpreted |
| res_row_dim, res_col_dim = get_dims(matrix_out, out_args) |
| if res_row_dim != a_dim[0] or res_col_dim != b_dim[1]: |
| plpy.execute(""" |
| INSERT INTO {matrix_out} VALUES ({row_count}, {col_count}, 0) |
| """.format(matrix_out=matrix_out, |
| row_count=a_dim[0], |
| col_count=b_dim[1])) |
| |
| |
| def _matrix_mult_dense(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, matrix_out, out_args, |
| is_b_transposed=False): |
| |
| temp_row, temp_col, temp_val = (unique_string() for i in range(3)) |
| |
| # compute trans of matrix_b to compute dot product between |
| # rows of matrix_a and cols of matrix_b (this is necessary only if ) |
| if (is_b_transposed): |
| # no need to explicitly transpose since it's already in transpose form |
| matrix_b_trans = matrix_b |
| b_trans_args = b_args |
| else: |
| matrix_b_trans = "pg_temp." + unique_string() + "_a6" |
| b_trans_args = _matrix_default_args() |
| _matrix_trans_dense(schema_madlib, |
| matrix_b, b_args, |
| matrix_b_trans, b_trans_args) |
| |
| b_trans_dim = get_dims(matrix_b_trans, b_trans_args) |
| if b_trans_dim[0] >= (1024 * 1024 * 1024 / 8): |
| # we are restricted by allocation limit of 1GB. Since array is of type |
| # 8-byte float, we have a limitation on how big the array can be. |
| plpy.error("Matrix error: Matrix {0} has too many columns. " |
| "This cannot be used in multiplication due to " |
| "restrictions on maximum array size.".format(matrix_b)) |
| # compute dot products in sparse format |
| matrix_r_sparse = "pg_temp." + unique_string() + "_a7" |
| r_s_args = _matrix_default_args() |
| plpy.execute(""" |
| CREATE TABLE {matrix_r_sparse} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| a.{a_args[row]} AS {r_s_args[row]}, |
| b.{b_trans_args[row]} AS {r_s_args[col]}, |
| {schema_madlib}.array_dot( |
| a.{a_args[val]}::float8[], |
| b.{b_trans_args[val]}::float8[]) AS {r_s_args[val]} |
| FROM |
| {matrix_a} AS a, |
| {matrix_b_trans} AS b |
| m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY ({r_s_args[row]})') |
| """.format(**locals())) |
| |
| # densify result since it'll always be dense |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {r_s_args[row]} as {out_args[row]}, |
| {schema_madlib}.__matrix_densify_agg( |
| {col_dim}::integer, |
| {r_s_args[col]}::integer, |
| {r_s_args[val]}::float8) AS {out_args[val]} |
| FROM |
| {matrix_r_sparse} |
| GROUP BY |
| {r_s_args[row]} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(col_dim=b_trans_dim[0], **locals())) |
| plpy.execute("DROP TABLE IF EXISTS " + matrix_r_sparse) |
| if not is_b_transposed: |
| plpy.execute("DROP TABLE IF EXISTS " + matrix_b_trans) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_square(schema_madlib, matrix_in, in_args, |
| matrix_out, out_args): |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| |
| dim = get_dims(matrix_in, in_args) |
| _assert(dim[1] == dim[0], |
| "Matrix error: Square operation is only defined for square matrices") |
| |
| fmt = None if 'fmt' not in out_args else out_args['fmt'] |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| validate_func = validate_sparse if is_input_sparse else validate_dense |
| mult_func = _matrix_mult_sparse if is_input_sparse else _matrix_mult_dense |
| change_func = matrix_sparsify if is_input_sparse else matrix_densify |
| change_fmt = fmt and fmt == ('sparse', 'dense')[is_input_sparse] |
| |
| validate_func(matrix_in, in_args) |
| matrix_r = "pg_temp." + unique_string() if change_fmt else matrix_out |
| mult_func(schema_madlib, matrix_in, in_args, |
| matrix_in, in_args, matrix_r, out_args) |
| if change_fmt: |
| change_func(schema_madlib, matrix_r, out_args, matrix_out, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _create_zero_matrix(schema_madlib, matrix_out, out_args, in_dim, fmt): |
| row_dim = in_dim[0] |
| col_dim = in_dim[1] |
| if fmt is None or fmt == 'sparse': |
| # just add a single row with the dimensionality |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT {row_dim} AS {out_args[row]}, |
| {col_dim} AS {out_args[col]}, |
| 0::FLOAT8 AS {out_args[val]} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})'); |
| """.format(**locals())) |
| else: |
| # add multiple rows of all zero vectors |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT {out_args[row]}, |
| {schema_madlib}.array_of_float({col_dim})::FLOAT8[] as {out_args[val]} |
| FROM generate_series(1, {row_dim}) AS {out_args[row]} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})'); |
| """.format(**locals())) |
| |
| |
| def _matrix_scalar_mult_sparse_to_dense(schema_madlib, matrix_in, in_args, |
| scalar, matrix_out, out_args, in_dim): |
| create_sql = """ |
| CREATE TABLE {matrix_out} |
| ({out_args[row]} BIGINT, {out_args[val]} FLOAT8[]) |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE) DISTRIBUTED BY ({in_args[row]})'); |
| """ |
| row_dim = in_dim[0] |
| col_dim = in_dim[1] |
| plpy.execute(create_sql.format(**locals())) |
| plan = plpy.execute(""" |
| INSERT INTO {matrix_out} |
| SELECT |
| b AS {in_args[row]}, |
| {schema_madlib}.__matrix_densify_agg( |
| {col_dim}, |
| CASE WHEN {in_args[val]} IS NULL THEN 1 ELSE {in_args[col]}::INTEGER END, |
| coalesce({in_args[val]} * {scalar}, 0)) AS {out_args[val]} |
| FROM |
| {matrix_in} m |
| RIGHT OUTER JOIN |
| generate_series(1, {row_dim}) b |
| ON m.{in_args[row]} = b |
| GROUP BY b |
| """.format(**locals())) |
| |
| |
| def _matrix_scalar_mult_sparse_to_sparse(matrix_in, in_args, scalar, matrix_out, out_args): |
| create_sql = """ |
| CREATE TABLE {matrix_out} |
| ({out_args[row]} BIGINT, {out_args[col]} BIGINT, {out_args[val]} FLOAT8) |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE) DISTRIBUTED BY ({in_args[row]})'); |
| """ |
| plpy.execute(create_sql.format(**locals())) |
| plan = plpy.prepare(""" |
| INSERT INTO {matrix_out} |
| SELECT |
| {in_args[row]}::BIGINT, |
| {in_args[col]}::BIGINT, |
| {in_args[val]} * $1 |
| FROM |
| {matrix_in} |
| WHERE |
| {in_args[val]} <> 0 |
| """.format(**locals()), ["FLOAT8"]) |
| plpy.execute(plan, [scalar]) |
| |
| |
| def _matrix_scalar_mult_dense(schema_madlib, matrix_in, in_args, |
| scalar, matrix_out, out_args, fmt, in_dim): |
| is_output_sparse = fmt and fmt == "sparse" |
| array_mult_arg = ("{schema_madlib}.array_scalar_mult(" |
| " {in_args[val]}::float8[]," |
| " {scalar}::float8)".format(**locals())) |
| if not is_output_sparse: |
| col_arg = '' |
| unnest_arg = array_mult_arg |
| else: |
| # for sparse output, unnest the multiplied array and add a col_id |
| col_dim = in_dim[1] |
| col_arg = "generate_series(1, {0})::BIGINT AS {1},".format(col_dim, out_args['col']) |
| unnest_arg = "unnest(" + array_mult_arg + ")" |
| |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} AS |
| SELECT |
| {in_args[row]}::BIGINT AS {out_args[row]}, |
| {col_arg} |
| {unnest_arg} AS {out_args[val]} |
| FROM |
| {matrix_in} |
| """.format(**locals())) |
| |
| |
| def matrix_scalar_mult(schema_madlib, matrix_in, in_args, scalar, matrix_out, out_args): |
| """ Multiply matrix with a scalar. |
| |
| Args: |
| @param schema_madlib: Str, Name of the schema containing madlib functions |
| @param matrix_in: Str, Name of the table containing matrix |
| @param in_args: Str, Name-value pair string containing options for matrix_in |
| @param scalar: Same as the type of elements in matrix_in |
| @param matrix_out: Str, Name of the matrix to contain transposed result |
| @param out_args: Str, Key-value string providing column names for |
| matrix_out. Like in_args, this can also be dictionary. |
| |
| Returns: |
| None |
| Side effect: |
| None |
| """ |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| _assert(scalar is not None, "Matrix error: Invalid value for scalar") |
| |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| in_dim = get_dims(matrix_in, in_args) |
| _assert(min(in_dim) > 0, |
| "Matrix error: Invalid dimensions for input matrices") |
| fmt = out_args['fmt'] if 'fmt' in out_args else None |
| |
| with plpy.subtransaction(): |
| in_dim = get_dims(matrix_in, in_args) |
| if scalar == 0: |
| _create_zero_matrix(schema_madlib, matrix_out, out_args, in_dim, fmt) |
| elif _is_sparse(matrix_in, in_args['val']): |
| validate_sparse(matrix_in, in_args) |
| if fmt is None or fmt == 'sparse': |
| _matrix_scalar_mult_sparse_to_sparse(matrix_in, in_args, |
| scalar, matrix_out, out_args) |
| else: |
| _matrix_scalar_mult_sparse_to_dense(schema_madlib, matrix_in, |
| in_args, scalar, |
| matrix_out, out_args, in_dim) |
| else: |
| validate_dense(matrix_in, in_args) |
| _matrix_scalar_mult_dense(schema_madlib, matrix_in, in_args, |
| scalar, matrix_out, out_args, fmt, in_dim) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _matrix_vec_mult_dense(schema_madlib, matrix_in, in_args, vector): |
| plan = plpy.prepare(""" |
| SELECT |
| array_agg(val::float8 order by row::bigint) as res |
| FROM ( |
| SELECT |
| {in_args[row]} as row, |
| {schema_madlib}.array_dot({in_args[val]}::float8[], |
| $1::float8[]) as val |
| FROM |
| {matrix_in} |
| ) l |
| """.format(**locals()), ["FLOAT8[]"]) |
| res = plpy.execute(plan, [vector]) |
| _assert(len(res) == 1, |
| "Matrix error: Invalid values in value column, " + in_args['val']) |
| return res[0]['res'] |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_vec_mult(schema_madlib, matrix_in, in_args, vector): |
| """ Multiply matrix with vector. |
| |
| Args: |
| @param schema_madlib: Str, Name of the schema containing madlib functions |
| @param matrix_in: Str, Name of the table containing matrix |
| @param in_args: Str, Name-value pair string containing options for matrix_in |
| @param vector: List, whose type is the same as type of elements in matrix_in |
| @return List, The result of matrix_in * vector |
| |
| Returns: |
| None |
| Side effect: |
| None |
| """ |
| _validate_input_table(matrix_in) |
| _assert(vector is not None, "Matrix error: Invalid input for vector") |
| in_args = parse_matrix_args(in_args) |
| in_dim = get_dims(matrix_in, in_args) |
| _assert(min(in_dim) > 0, |
| "Matrix error: Invalid dimensions for input matrices") |
| _assert(in_dim[1] == len(vector), |
| "Matrix error: Dimension mismatch between matrix ({0[0]} x {0[1]}) " |
| "and vector ({1} x 1)".format(in_dim, len(vector))) |
| |
| matrix_temp = None |
| if _is_sparse(matrix_in, in_args['val']): |
| validate_sparse(matrix_in, in_args) |
| matrix_temp = "pg_temp." + unique_string() + "_a5" |
| matrix_densify(schema_madlib, |
| matrix_in, in_args, |
| matrix_temp, in_args) |
| matrix_in = matrix_temp |
| else: |
| validate_dense(matrix_in, in_args) |
| |
| res = _matrix_vec_mult_dense(schema_madlib, matrix_in, in_args, vector) |
| |
| if matrix_temp is not None: |
| plpy.execute("DROP TABLE IF EXISTS " + matrix_temp) |
| return res |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_scale_and_add(schema_madlib, matrix_a, a_args, |
| matrix_b, b_args, scale, matrix_out, out_args, **kwargs): |
| """ |
| Perform a matrix scale operation on a dense matrix, e.g, A + c*B. |
| Args: |
| @param schema_madlib Name of the schema where MADlib is installed |
| @param matrix_a Matrix input A |
| @param matrix_b Matrix input B |
| @param scale Scale in the scale and add operation |
| @param matrix_out Matrix R |
| |
| Returns: |
| @param norm |
| |
| Throws: |
| plpy.error if any argument is invalid |
| |
| """ |
| # TODO: this function only works with dense matrices. Need to add sparse support |
| _validate_input_table(matrix_a) |
| _validate_input_table(matrix_b) |
| _validate_output_table(matrix_out) |
| |
| a_args = parse_matrix_args(a_args) |
| b_args = parse_matrix_args(b_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': a_args['row'], |
| 'col': a_args['col'], |
| 'val': a_args['val']}) |
| a_dim = get_dims(matrix_a, a_args) |
| b_dim = get_dims(matrix_b, b_args) |
| _assert(min(a_dim) > 0 and min(b_dim) > 0, |
| "Matrix error: Invalid dimensions for input matrices") |
| _assert(a_dim[0] == b_dim[0] and a_dim[1] == b_dim[1], |
| "Matrix error: The dimensions of the two matrices don't match") |
| |
| validate_dense(matrix_a, a_args, row_dim=a_dim[0]) |
| validate_dense(matrix_b, b_args, row_dim=b_dim[0]) |
| |
| if scale is None: |
| plpy.error("Matrix error : Scale cannot be NULL") |
| |
| # Create a dense matrix table |
| plpy.execute( |
| """ |
| CREATE TABLE {matrix_out} AS |
| SELECT a.{a_args[row]} as {out_args[row]}, |
| {schema_madlib}.array_add( |
| a.{a_args[val]}::float8[], |
| {schema_madlib}.array_scalar_mult( |
| b.{b_args[val]}::float8[], |
| ({scale})::DOUBLE PRECISION)) |
| as {out_args[val]} |
| FROM |
| {matrix_a} as a, {matrix_b} as b |
| WHERE a.{a_args[row]}::bigint = b.{b_args[row]}::bigint |
| """.format(**locals())) |
| # ------------------------------------------------------------------------------ |
| |
| |
| # ----------------------------------------------------------------------- |
| # Norm operations |
| # ----------------------------------------------------------------------- |
| def _matrix_elem_norm(schema_madlib, matrix_in, in_args, p): |
| """ |
| Perform a matrix_elem_norm operation on a sparse matrix |
| Args: |
| @param matrix_in Name of the source table |
| @param in_args Name of the source table format args |
| @param p Element-wise norm value |
| Returns: |
| @param norm |
| Throws: |
| plpy.error if any argument is invalid |
| """ |
| _validate_input_table(matrix_in) |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| sum_fun = "sum" |
| pow_fun = "pow" |
| val_expr = "{0[val]}::float8".format(in_args) |
| validate_sparse(matrix_in, in_args) |
| else: |
| sum_fun = "{0}.array_sum".format(schema_madlib) |
| pow_fun = "{0}.array_pow".format(schema_madlib) |
| val_expr = "{0[val]}::float8[]".format(in_args) |
| validate_dense(matrix_in, in_args) |
| in_args = parse_matrix_args(in_args) |
| norm = plpy.execute(""" |
| SELECT |
| pow(sum(pow_norm), 1./{p}::float8) as res |
| FROM ( |
| SELECT {sum_fun}( |
| {pow_fun}({val_expr}, {p}::float8)) AS pow_norm |
| FROM {matrix_in} |
| ) t |
| """.format(**locals())) |
| _assert(norm is not None and len(norm) == 1 and 'res' in norm[0], |
| "Matrix error: invalid result") |
| return norm[0]['res'] |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _matrix_induced_norm(schema_madlib, matrix_in, in_args, norm_type): |
| _validate_input_table(matrix_in) |
| if norm_type == "spectral": |
| return _matrix_spectral_norm(schema_madlib, matrix_in, in_args) |
| |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| validate_sparse(matrix_in, in_args) |
| agg_double_fun = {'one': "sum(abs", |
| 'infinity': "sum(abs", |
| 'max': "max(abs" |
| }[norm_type] |
| val_expr = "{0}::float8".format(in_args['val']) |
| group_by_str = {'one': "GROUP BY " + in_args['col'], |
| 'infinity': "GROUP BY " + in_args['row'], |
| 'max': "" |
| }[norm_type] |
| else: |
| validate_dense(matrix_in, in_args) |
| agg_double_fun = ({'one': "{0}.array_max({0}.__matrix_column_abs_sum_agg", |
| 'infinity': "{0}.array_sum({0}.array_abs", |
| 'max': "{0}.array_max({0}.array_abs" |
| }[norm_type] |
| ).format(schema_madlib) |
| val_expr = "{0}::float8[]".format(in_args['val']) |
| group_by_str = "" |
| norm = plpy.execute( |
| """ |
| SELECT max(v) as res |
| FROM ( |
| -- output of inner query is 1 or more float8 values |
| SELECT |
| {agg_double_fun}({val_expr})) as v |
| FROM |
| {matrix_in} |
| {group_by_str} |
| ) t |
| """.format(**locals())) |
| _assert(norm is not None and len(norm) == 1 and 'res' in norm[0], |
| "Matrix error: invalid result ") |
| return norm[0]['res'] |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _matrix_spectral_norm(schema_madlib, matrix_in, in_args): |
| matrix_in_temp = None |
| matrix_in_view = None |
| |
| with plpy.subtransaction(): |
| if _is_sparse(matrix_in, in_args['val']): |
| validate_sparse(matrix_in, in_args) |
| matrix_in_temp = "pg_temp." + unique_string() + "_in" |
| matrix_densify(schema_madlib, matrix_in, in_args, |
| matrix_in_temp, 'row=row_id,val=row_vec') |
| matrix_in = matrix_in_temp |
| in_args = {'row': 'row_id', 'val': 'row_vec'} |
| else: |
| validate_dense(matrix_in, in_args) |
| matrix_in_view = "pg_temp." + unique_string() + "_view" |
| plpy.execute(""" |
| CREATE VIEW {matrix_in_view} AS |
| (SELECT |
| {in_args[row]} as row_id, |
| {in_args[val]} as row_vec |
| FROM {matrix_in}) |
| """.format(**locals())) |
| matrix_in = matrix_in_view |
| in_args = {'row': 'row_id', 'val': 'row_vec'} |
| |
| matrix_prefix = "pg_temp." + unique_string() |
| matrix_s = matrix_prefix + "_s" |
| matrix_u = matrix_prefix + "_u" |
| matrix_v = matrix_prefix + "_v" |
| |
| plpy.execute(""" |
| SELECT |
| {schema_madlib}.svd('{matrix_in}', '{matrix_prefix}', '{in_args[row]}', 1) |
| """.format(**locals())) |
| |
| res = plpy.execute(""" |
| SELECT value from {matrix_s} |
| WHERE row_id = 1 AND col_id = 1 AND value is not NULL |
| """.format(**locals())) |
| |
| _assert(len(res) == 1 and 'value' in res[0], |
| "Matrix error: Failed to calculate spectral norm") |
| |
| spectral_norm = res[0]['value'] |
| |
| if matrix_in_temp is not None: |
| plpy.execute('DROP TABLE IF EXISTS ' + matrix_in_temp) |
| if matrix_in_view is not None: |
| plpy.execute('DROP VIEW IF EXISTS ' + matrix_in_view) |
| |
| plpy.execute('DROP TABLE IF EXISTS ' + matrix_s) |
| plpy.execute('DROP TABLE IF EXISTS ' + matrix_u) |
| plpy.execute('DROP TABLE IF EXISTS ' + matrix_v) |
| |
| return spectral_norm |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_norm(schema_madlib, matrix_in, in_args, norm_type): |
| """ |
| Perform a matrix norm operation on a matrix |
| Args: |
| @param schema_madlib Name of the schema where MADlib is installed |
| @param matrix_a Name of the source table |
| @param norm_type Name of the norm computed |
| |
| Returns: |
| @return norm value |
| |
| Throws: |
| plpy.error if any argument is invalid |
| |
| """ |
| _validate_input_table(matrix_in) |
| in_args = parse_matrix_args(in_args) |
| norm_type = 'fro' if not norm_type else str(norm_type).lower() |
| |
| # norm_type can be a prefix of any of the below supported types |
| supported_norm_types = ['one', 'spectral', 'infinity', 'max', 'frobenius'] |
| try: |
| norm_type_val = next(x for x in supported_norm_types |
| if x.startswith(norm_type)) |
| if norm_type_val == "frobenius": |
| # frobenius norm is same as element-wise 2-norm |
| return _matrix_elem_norm(schema_madlib, matrix_in, in_args, 2) |
| else: |
| return _matrix_induced_norm(schema_madlib, matrix_in, in_args, norm_type_val) |
| except StopIteration: |
| # next() returns a StopIteration if no element found |
| # If norm_type is not a supported string then it has be a positive float |
| try: |
| param = float(norm_type) |
| _assert(param > 0, |
| "Matrix error: Element-wise norm ({0}) should be a positive value." |
| .format(param)) |
| return _matrix_elem_norm(schema_madlib, matrix_in, in_args, param) |
| except ValueError: |
| plpy.error("Matrix Error: Invalid norm type: {0}. " |
| "It can either be a positive float or (subset string) of " |
| "one of the supported norm types ({1})" |
| .format(norm_type, ', '.join(sorted(supported_norm_types)))) |
| # ------------------------------------------------------------------------------ |
| |
| |
| # ----------------------------------------------------------------------- |
| # Decomposition operations |
| # ----------------------------------------------------------------------- |
| def matrix_transform_helper(schema_madlib, matrix_in, in_args, matrix_out, |
| out_args, dim, agg_funcs_info): |
| fmt = None if 'fmt' not in out_args else out_args['fmt'] |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| validate_sparse(matrix_in, in_args) |
| agg_str = """{schema_madlib}.{agg_func}( |
| {n_rows}::integer, |
| {n_cols}::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[col]}-1)::integer, |
| ({in_args[val]})::double precision |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], n_cols=dim[1], |
| in_args=in_args, agg_func=agg_funcs_info['sparse_agg']) |
| else: |
| validate_dense(matrix_in, in_args) |
| agg_str = """{schema_madlib}.{agg_func}( |
| ({n_rows})::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[val]})::double precision[] |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args, agg_func=agg_funcs_info['dense_agg']) |
| is_output_sparse = fmt and fmt == 'sparse' |
| matrix_r = "pg_temp." + unique_string() if is_output_sparse else matrix_out |
| plpy.execute(""" |
| CREATE TABLE {matrix_r} AS |
| SELECT row_id as {out_args[row]}, |
| {schema_madlib}.get_row(res, row_id) as {out_args[val]} |
| FROM ( |
| SELECT {agg_str} as res |
| FROM {matrix_in} |
| ) q, generate_series(1, {dim[1]}) as row_id |
| """.format(**locals())) |
| if is_output_sparse: |
| matrix_sparsify(schema_madlib, matrix_r, out_args, matrix_out, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_inverse(schema_madlib, matrix_in, in_args, matrix_out, out_args): |
| """ Compute the inverse of a matrix. |
| |
| Inverse of a matrix is typically a dense matrix, including sparse input matrices. |
| Hence we always return a dense matrix result; |
| out_args['fmt'] = 'sparse' ensures a sparsified output. |
| """ |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| _assert(dim[0] == dim[1], |
| "Matrix error: Inverse operation is only defined for square matrices") |
| # FIXME: currently matrix inverse is computed on a single node after collating |
| # a distributed matrix. This places a limit on the maximum size of the matrix |
| # GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix inverse operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| agg_funcs_info = {'sparse_agg': '__matrix_sparse_inverse', |
| 'dense_agg': '__matrix_dense_inverse'} |
| matrix_transform_helper(schema_madlib, matrix_in, in_args, matrix_out, |
| out_args, dim, agg_funcs_info) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_eigen(schema_madlib, matrix_in, in_args, matrix_out, out_args): |
| """ Compute the eigen values of a matrix. |
| |
| The eigenvalues are repeated according to their algebraic multiplicity, so |
| there are as many eigenvalues as rows in the matrix. |
| |
| """ |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| |
| in_args = parse_matrix_args(in_args) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| _assert(dim[0] == dim[1], |
| "Matrix error: Eigen value is only defined for square matrices") |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': 'eigen_values'}) |
| # FIXME: currently matrix eigen is computed on a single node after collating |
| # a distributed matrix. This places a limit on the maximum size of the matrix |
| # GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix eigen operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| validate_sparse(matrix_in, in_args) |
| eigen_agg = """{schema_madlib}.__matrix_sparse_eigen( |
| {n_rows}::integer, |
| {n_rows}::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[col]}-1)::integer, |
| ({in_args[val]})::double precision |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args) |
| else: |
| validate_dense(matrix_in, in_args) |
| eigen_agg = """{schema_madlib}.__matrix_dense_eigen( |
| ({n_rows})::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[val]})::double precision[] |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args) |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} AS |
| SELECT row_id as {out_args[row]}, |
| row(value[1], value[2])::{schema_madlib}.complex as {out_args[val]} |
| FROM ( |
| SELECT row_id, |
| {schema_madlib}.get_row(res, row_id) as value |
| FROM ( |
| SELECT {eigen_agg} as res |
| FROM {matrix_in} |
| ) q, generate_series(1, {dim[0]}) as row_id ) m |
| """.format(**locals())) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_cholesky(schema_madlib, matrix_in, in_args, |
| matrix_out_prefix, out_args): |
| """ Compute the standard cholesky decomposition of a matrix. |
| |
| Cholesky decomposition of a matrix is typically a dense matrix, including |
| sparse input matrices. Hence we always return a dense matrix result - |
| out_args['fmt'] = 'sparse' ensures a sparsified output. |
| |
| Perform a robust Cholesky decomposition of a positive input matrix A, such |
| that A=P^tLDL*P, where P is a permutation matrix, L is lower triangular with |
| a unit diagonal and D is a diagonal matrix. C++ function returns |
| [P, L, D] together, and we retrieve them respectively. NOTE: Because it is |
| required that input matrix should be symmetric, it is only the lower |
| triangular part that will be used for the decompositon. The upper triangular |
| part won't be read. |
| """ |
| |
| _validate_input_table(matrix_in) |
| _assert(matrix_out_prefix is not None and matrix_out_prefix.replace('"', '').strip() != '', |
| "Matrix error: Invalid output prefix ({0})".format(matrix_out_prefix)) |
| matrix_p, matrix_l, matrix_d = ( |
| add_postfix(matrix_out_prefix, i) for i in ("_p", "_l", "_d")) |
| for each_output in (matrix_p, matrix_l, matrix_d): |
| _validate_output_table(each_output) |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| _assert(dim[0] == dim[1], |
| "Matrix error: Cholesky decomposition operation is only defined for square matrices") |
| # FIXME: currently matrix cholesky is computed on a single node after collating |
| # a distributed matrix. This places a limit on the maximum size of the matrix |
| # GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix cholesky operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| |
| out_fmt = None if 'fmt' not in out_args else out_args['fmt'] |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| validate_sparse(matrix_in, in_args) |
| qr_agg = """{schema_madlib}.__matrix_sparse_cholesky( |
| {n_rows}::integer, |
| {n_rows}::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[col]}-1)::integer, |
| ({in_args[val]})::double precision |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args) |
| else: |
| validate_dense(matrix_in, in_args) |
| qr_agg = """{schema_madlib}.__matrix_dense_cholesky( |
| ({n_rows})::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[val]})::double precision[] |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args) |
| matrix_temp = "pg_temp." + unique_string() |
| plpy.execute(""" |
| CREATE TABLE {matrix_temp} AS |
| SELECT row_id, |
| {schema_madlib}.get_row(res, row_id) as row_vec |
| FROM ( |
| SELECT {qr_agg} as res |
| FROM {matrix_in} |
| ) q, generate_series(1, {dim[0]}) as row_id |
| """.format(**locals())) |
| is_output_sparse = out_fmt and out_fmt == 'sparse' |
| matrix_p_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_p |
| plpy.execute(""" |
| CREATE TABLE {matrix_p_temp} AS |
| SELECT row_id AS {out_args[row]}, |
| row_vec[1:{dim[0]}] AS {out_args[val]} |
| FROM {matrix_temp} |
| """.format(**locals())) |
| if is_output_sparse: |
| matrix_sparsify(schema_madlib, matrix_p_temp, out_args, matrix_p, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_p_temp) |
| |
| matrix_l_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_l |
| plpy.execute(""" |
| CREATE TABLE {matrix_l_temp} AS |
| SELECT row_id AS {out_args[row]}, |
| row_vec[{dim[0]} + 1 : {dim[0]} * 2] AS {out_args[val]} |
| FROM {matrix_temp} |
| """.format(**locals())) |
| if is_output_sparse: |
| matrix_sparsify(schema_madlib, matrix_l_temp, out_args, matrix_l, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_l_temp) |
| |
| matrix_d_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_d |
| plpy.execute(""" |
| CREATE TABLE {matrix_d_temp} AS |
| SELECT row_id AS {out_args[row]}, |
| row_vec[{dim[0]} * 2 + 1: {dim[0]} * 3] AS {out_args[val]} |
| FROM {matrix_temp} |
| """.format(**locals())) |
| if is_output_sparse: |
| matrix_sparsify(schema_madlib, matrix_d_temp, out_args, matrix_d, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_d_temp) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_temp) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_qr(schema_madlib, matrix_in, in_args, matrix_out_prefix, out_args): |
| """ Compute the QR decomposition of a matrix. |
| |
| QR decomposition of a matrix is typically a dense matrix, including sparse input matrices. |
| Hence we always return a dense matrix result - out_args['fmt'] = 'sparse' |
| ensures a sparsified output. |
| |
| Performs a QR decomposition of a matrix A into matrices Q and R such that |
| A = QR. Cplusplus functions returns the result as: [Q, R]. |
| """ |
| _validate_input_table(matrix_in) |
| _assert(matrix_out_prefix is not None and matrix_out_prefix.replace('"', '').strip() != '', |
| "Matrix error: Invalid output prefix ({0})".format(matrix_out_prefix)) |
| matrix_q = add_postfix(matrix_out_prefix, "_q") |
| matrix_r = add_postfix(matrix_out_prefix, "_r") |
| _validate_output_table(matrix_q) |
| _validate_output_table(matrix_r) |
| |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| # FIXME: currently decomposition is computed on a single node after collating |
| # a distributed matrix. This places a limit on the maximum size of the matrix |
| # GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix cholesky operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| |
| out_fmt = None if 'fmt' not in out_args else out_args['fmt'] |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| validate_sparse(matrix_in, in_args) |
| qr_agg = """{schema_madlib}.__matrix_sparse_qr( |
| {n_rows}::integer, |
| {n_cols}::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[col]}-1)::integer, |
| ({in_args[val]})::double precision |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| n_cols=dim[1], in_args=in_args) |
| else: |
| validate_dense(matrix_in, in_args) |
| qr_agg = """{schema_madlib}.__matrix_dense_qr( |
| ({n_rows})::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[val]})::double precision[] |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args) |
| matrix_temp = "pg_temp." + unique_string() |
| plpy.execute(""" |
| CREATE TABLE {matrix_temp} AS |
| SELECT row_id, |
| {schema_madlib}.get_row(res, row_id) as row_vec |
| FROM ( |
| SELECT {qr_agg} as res |
| FROM {matrix_in} |
| ) q, generate_series(1, {dim[0]}) as row_id |
| """.format(**locals())) |
| is_output_sparse = out_fmt and out_fmt == 'sparse' |
| matrix_q_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_q |
| plpy.execute(""" |
| CREATE TABLE {matrix_q_temp} AS |
| SELECT row_id as {out_args[row]}, |
| row_vec[1:{dim[0]}] AS {out_args[val]} |
| FROM {matrix_temp} |
| """.format(**locals())) |
| if is_output_sparse: |
| matrix_sparsify(schema_madlib, matrix_q_temp, out_args, matrix_q, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_q_temp) |
| |
| matrix_r_temp = "pg_temp." + unique_string() if is_output_sparse else matrix_r |
| plpy.execute(""" |
| CREATE TABLE {matrix_r_temp} AS |
| SELECT row_id as {out_args[row]}, |
| row_vec[{dim[0]} + 1 : {dim[0]} + {dim[1]}] AS {out_args[val]} |
| FROM {matrix_temp} |
| """.format(**locals())) |
| if is_output_sparse: |
| matrix_sparsify(schema_madlib, matrix_r_temp, out_args, matrix_r, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_r_temp) |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_temp) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_eval_helper(schema_madlib, matrix_in, in_args, dim, eval_funcs_info): |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| validate_sparse(matrix_in, in_args) |
| eval_agg = """{schema_madlib}.{eval_func}( |
| {n_rows}::integer, |
| {n_rows}::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[col]}-1)::integer, |
| ({in_args[val]})::double precision |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args, eval_func=eval_funcs_info['sparse_eval']) |
| else: |
| validate_dense(matrix_in, in_args) |
| eval_agg = """{schema_madlib}.{eval_func}( |
| ({n_rows})::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[val]})::double precision[] |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args, eval_func=eval_funcs_info['dense_eval']) |
| r = plpy.execute(""" |
| SELECT {eval_agg} as res FROM {matrix_in} |
| """.format(**locals())) |
| _assert(r and len(r) == 1, |
| "Matrix error: Invalid evaluation for matrix {0}".format(matrix_in)) |
| return r[0]['res'] |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_rank(schema_madlib, matrix_in, in_args): |
| """ Compute the rank of a matrix. |
| |
| Rank of a matrix is typically a dense matrix, including sparse input matrices. |
| Hence we always return a dense matrix result - out_args['fmt'] = 'sparse' |
| ensures a sparsified output. |
| |
| """ |
| _validate_input_table(matrix_in) |
| |
| in_args = parse_matrix_args(in_args) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| # FIXME: currently matrix rank is computed on a single node after collating |
| # a distributed matrix. This places a limit on the maximum size of the matrix |
| # GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix inverse operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| eval_funcs_info = {'sparse_eval': '__matrix_sparse_rank', |
| 'dense_eval': '__matrix_dense_rank'} |
| return matrix_eval_helper(schema_madlib, matrix_in, in_args, dim, eval_funcs_info) |
| |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_lu(schema_madlib, matrix_in, in_args, |
| matrix_out_prefix, out_args): |
| """ Compute the full pivoting LU decomposition of a matrix. |
| |
| Full pivoting LU decomposition of a matrix is typically a dense matrix, |
| including sparse input matrices. |
| Hence we always return a dense matrix result - out_args['fmt'] = 'sparse' |
| ensures a sparsified output. |
| |
| Performs LU decomposition of a matrix with complete pivoting: the matrix A is |
| decomposed as A = P^-1LUQ^-1, where L is unit-lower-triangular, U is upper-triangular, |
| and P and Q are permutation matrices. C++ function returns the result as: |
| [P, L, U, Q]. Note that P, L, U, Q have different row number and column number if |
| A is not square matrix. |
| """ |
| _validate_input_table(matrix_in) |
| _assert(matrix_out_prefix is not None and matrix_out_prefix.replace('"', '').strip() != '', |
| "Matrix error: Invalid output prefix ({0})".format(matrix_out_prefix)) |
| |
| matrix_output_names = dict([(i, add_postfix(matrix_out_prefix, "_" + i)) |
| for i in ("p", "l", "u", "q")]) |
| for each_output in matrix_output_names.values(): |
| _validate_output_table(each_output) |
| |
| in_args = parse_matrix_args(in_args) |
| default_args = {'row': in_args['row'], 'col': in_args['col'], 'val': in_args['val']} |
| out_args = parse_matrix_args(out_args, in_default_args=default_args) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, "Matrix error: Invalid dimensions for input matrix") |
| |
| # FIXME: currently matrix full pivoting decomposition is computed on a single node |
| # after collating a distributed matrix. This places a limit on the maximum size of |
| # the matrix GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix inverse operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| |
| out_fmt = None if 'fmt' not in out_args else out_args['fmt'] |
| is_input_sparse = _is_sparse(matrix_in, in_args['val']) |
| if is_input_sparse: |
| validate_sparse(matrix_in, in_args) |
| lu_agg = """{schema_madlib}.__matrix_sparse_lu( |
| {n_rows}::integer, |
| {n_rows}::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[col]}-1)::integer, |
| ({in_args[val]})::double precision |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args) |
| else: |
| validate_dense(matrix_in, in_args) |
| lu_agg = """{schema_madlib}.__matrix_dense_lu( |
| ({n_rows})::integer, |
| ({in_args[row]}-1)::integer, |
| ({in_args[val]})::double precision[] |
| ) |
| """.format(schema_madlib=schema_madlib, n_rows=dim[0], |
| in_args=in_args) |
| is_output_sparse = out_fmt and out_fmt == 'sparse' |
| matrix_temp = "pg_temp." + unique_string() |
| |
| row_count = max(dim) |
| # number of rows in result is determined by the max(dim) |
| plpy.execute(""" |
| CREATE TABLE {matrix_temp} AS |
| SELECT row_id, |
| {schema_madlib}.get_row(res, row_id) as row_vec |
| FROM ( |
| SELECT {lu_agg} as res |
| FROM {matrix_in} |
| ) q, generate_series(1, {row_count}) as row_id |
| """.format(**locals())) |
| |
| matrix_temp_names = dict([ |
| (k, 'pg_temp.' + unique_string() if is_output_sparse else v) |
| for k, v in matrix_output_names.items()]) |
| plpy.execute(""" |
| CREATE TABLE {matrix_temp_names[p]} AS |
| SELECT row_id AS {out_args[row]}, |
| row_vec[1:{dim[0]}] AS {out_args[val]} |
| FROM {matrix_temp} WHERE row_id <= {dim[0]} |
| """.format(**locals())) |
| plpy.execute(""" |
| CREATE TABLE {matrix_temp_names[q]} AS |
| SELECT row_id AS {out_args[row]}, |
| row_vec[2 * {dim[0]} + {dim[1]} + 1:2 * {dim[0]} + 2 * {dim[1]}] AS {out_args[val]} |
| FROM {matrix_temp} WHERE row_id <= {dim[1]} |
| """.format(**locals())) |
| plpy.execute(""" |
| CREATE TABLE {matrix_temp_names[l]} AS |
| SELECT row_id AS {out_args[row]}, |
| row_vec[{dim[0]} + 1 : 2 * {dim[0]}] AS {out_args[val]} |
| FROM {matrix_temp} WHERE row_id <= {dim[0]} |
| """.format(**locals())) |
| plpy.execute(""" |
| CREATE TABLE {matrix_temp_names[u]} AS |
| SELECT row_id AS {out_args[row]}, |
| row_vec[2 * {dim[0]} + 1 : 2 * {dim[0]} + {dim[1]}] AS {out_args[val]} |
| FROM {matrix_temp} WHERE row_id <= {dim[0]} |
| """.format(**locals())) |
| |
| if is_output_sparse: |
| for temp, output in zip(matrix_temp_names.values(), |
| matrix_output_names.values()): |
| matrix_sparsify(schema_madlib, temp, out_args, output, out_args) |
| plpy.execute('DROP TABLE IF EXISTS %s' % temp) |
| |
| plpy.execute('DROP TABLE IF EXISTS %s' % matrix_temp) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_nuclear_norm(schema_madlib, matrix_in, in_args): |
| """ Compute the nuclear norm of a matrix. |
| |
| Nuclear norm of a matrix is typically a dense matrix, including sparse input matrices. |
| Hence we always return a dense matrix result - out_args['fmt'] = 'sparse' |
| ensures a sparsified output. |
| |
| """ |
| _validate_input_table(matrix_in) |
| |
| in_args = parse_matrix_args(in_args) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| # FIXME: currently matrix rank is computed on a single node after collating |
| # a distributed matrix. This places a limit on the maximum size of the matrix |
| # GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix inverse operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| eval_funcs_info = {'sparse_eval': '__matrix_sparse_nuclear_norm', |
| 'dense_eval': '__matrix_dense_nuclear_norm'} |
| return matrix_eval_helper(schema_madlib, matrix_in, in_args, dim, eval_funcs_info) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_pinv(schema_madlib, matrix_in, in_args, |
| matrix_out, out_args): |
| """ Compute the generalized inverse of a matrix. |
| |
| Generalized inverse of a matrix is typically a dense matrix, including sparse input matrices. |
| Hence we always return a dense matrix result - out_args['fmt'] = 'sparse' |
| ensures a sparsified output. |
| |
| """ |
| _validate_input_table(matrix_in) |
| _validate_output_table(matrix_out) |
| |
| in_args = parse_matrix_args(in_args) |
| out_args = parse_matrix_args(out_args, |
| in_default_args={'row': in_args['row'], |
| 'col': in_args['col'], |
| 'val': in_args['val']}) |
| dim = get_dims(matrix_in, in_args) |
| _assert(min(dim) > 0, |
| "Matrix error: Invalid dimensions for input matrix") |
| # FIXME: currently matrix inverse is computed on a single node after collating |
| # a distributed matrix. This places a limit on the maximum size of the matrix |
| # GPDB limit = 1GB implying size * size < 10^9 / 8 i.e. size <= 11100 |
| max_size = 11100 |
| _assert(dim[0] <= max_size, |
| """Matrix error: Reached maximum limit for matrix inverse operation. |
| Maximum limit for matrix size is {0} x {0} """.format(max_size)) |
| agg_funcs_info = {'sparse_agg': '__matrix_sparse_pinv', |
| 'dense_agg': '__matrix_dense_pinv'} |
| matrix_transform_helper(schema_madlib, matrix_in, in_args, matrix_out, |
| out_args, dim, agg_funcs_info) |
| # ------------------------------------------------------------------------------ |
| |
| # ----------------------------------------------------------------------- |
| # Creation operations |
| # ----------------------------------------------------------------------- |
| |
| |
| def _matrix_diag_sparse(schema_madlib, diag_elements, matrix_out, out_args): |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| ({out_args[row]} INTEGER, {out_args[col]} INTEGER, {out_args[val]} FLOAT8) |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE) DISTRIBUTED BY ({out_args[row]})'); |
| """.format(**locals())) |
| matrix_dim = len(diag_elements) |
| plan = plpy.prepare(""" |
| INSERT INTO |
| {matrix_out} |
| SELECT |
| num as {out_args[row]}, |
| num as {out_args[col]}, |
| val as {out_args[val]} |
| FROM ( |
| SELECT |
| unnest($1) as val, |
| generate_series(1, {matrix_dim}) as num |
| ) l1 |
| WHERE val <> 0.0 or num = {matrix_dim} |
| """.format(**locals()), ["FLOAT8[]"]) |
| plpy.execute(plan, [diag_elements]) |
| |
| |
| def _matrix_diag_dense(schema_madlib, diag_elements, matrix_out, out_args): |
| dim = len(diag_elements) |
| plan = plpy.prepare(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| row as {out_args[row]}, |
| {schema_madlib}.__matrix_densify_agg({dim}, row, val) AS {out_args[val]} |
| FROM ( |
| SELECT |
| unnest($1) AS val, |
| generate_series(1, {dim}) AS row |
| ) l |
| GROUP BY l.row |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals()), ["FLOAT8[]"]) |
| plpy.execute(plan, [diag_elements]) |
| |
| |
| def matrix_diag(schema_madlib, diag_elements, matrix_out, out_args): |
| """ Perform create a diagonal matrix with a specified vector on the main diagonal. |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib functions |
| @param diag_elements: array, Name of the vector containing diagonal elements. |
| Requires not NULL, not empty, not contains NULL |
| @param matrix_out: str, Name of the table to store result diagonal matrix |
| @param out_args: str, Name-value pair string containing options for matrix_out |
| |
| Returns: |
| None |
| Side effect: |
| Creates an output table containing the result diagonal matrix |
| """ |
| _validate_input_array(diag_elements) |
| _validate_output_table(matrix_out) |
| out_args = parse_matrix_args(out_args, in_default_args={'row': 'row', |
| 'col': 'col', |
| 'val': 'val'}) |
| if 'fmt' not in out_args or out_args['fmt'] == 'sparse': |
| _matrix_diag_sparse(schema_madlib, diag_elements, matrix_out, out_args) |
| else: |
| _matrix_diag_dense(schema_madlib, diag_elements, matrix_out, out_args) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def matrix_identity(schema_madlib, dim, matrix_out, out_args): |
| """ Perform create an identity matrix with the dimensionality specified by an integer. |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib functions |
| @param dim: integer, Name of a integer specifing the dimensinality. |
| Requires not NULL, dim > 0 |
| @param matrix_out: str, Name of the table to store result identity matrix |
| @param out_args: str, Name-value pair string containing options for matrix_out |
| |
| Returns: |
| None |
| Side effect: |
| Creates an output table containing the result identity matrix |
| """ |
| _assert(dim and dim > 0, |
| "Invalid input for dimensionality of matrix") |
| diag_elements = [1] * dim |
| matrix_diag(schema_madlib, diag_elements, matrix_out, out_args) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def _matrix_create_scalar(schema_madlib, row_dim, col_dim, matrix_out, out_args, scalar): |
| """ Create a row_dim by col_dim matrix every element of which is 1. |
| |
| Args: |
| @param schema_madlib: str, Name of the schema containing madlib fuctions |
| @param row_dim: int, The number of rows of matrix |
| @param col_dim: int, The number of columns of matrix |
| @param matrix_out: str, Name of the table to store result matrix of ones |
| @param out_args: str, Name-value pair string containing options for matrix_out |
| @param scalar: float, Value of elements in matrix_out |
| |
| Returns: |
| None |
| """ |
| _validate_output_table(matrix_out) |
| _assert(row_dim > 0 and col_dim > 0, |
| "Matrix error: Invalid dimensions for input matrices: {0} x {1}". |
| format(row_dim, col_dim)) |
| _assert(scalar is not None, |
| "Matrix error: Invalid none value for elements.") |
| |
| out_args = parse_matrix_args( |
| out_args, in_default_args={'row': 'row', 'col': 'col', 'val': 'val'}) |
| |
| if 'fmt' not in out_args or out_args['fmt'] == 'sparse': |
| if scalar != 0: |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| {out_args[row]}, |
| {out_args[col]}, |
| {scalar}::FLOAT8 as {out_args[val]} |
| FROM |
| generate_series(1, {row_dim}) {out_args[row]}, |
| generate_series(1, {col_dim}) {out_args[col]} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| else: |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT {row_dim} as {out_args[row]}, |
| {col_dim} as {out_args[col]}, |
| 0::FLOAT8 as {out_args[val]} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| else: |
| plpy.execute(""" |
| CREATE TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| row as {out_args[row]}, |
| madlib.array_fill(madlib.array_of_float({col_dim}), |
| {scalar}::FLOAT8) as {out_args[val]} |
| FROM |
| generate_series(1, {row_dim}) as row |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(**locals())) |
| |
| |
| def matrix_zeros(schema_madlib, row_dim, col_dim, matrix_out, out_args): |
| """ Create a row_dim by col_dim matrix of zeros. |
| """ |
| _matrix_create_scalar(schema_madlib, row_dim, col_dim, matrix_out, out_args, 0) |
| |
| |
| def matrix_ones(schema_madlib, row_dim, col_dim, matrix_out, out_args): |
| """ Create a row_dim by col_dim matrix with each element set to 1. |
| """ |
| _matrix_create_scalar(schema_madlib, row_dim, col_dim, matrix_out, out_args, 1) |
| |
| |
| def matrix_random(schema_madlib, distribution, row_dim, col_dim, |
| in_args, matrix_out, out_args): |
| """ Generate a row_dim x col_dim random matrix sampled from given distribution |
| |
| Args: |
| schema_madlib: MADlib schema name |
| distribution: Str, Name of the sampling distribution. |
| Supported names: uniform, normal, bernoulli |
| row_dim: int, Target matrix row dimensionality |
| col_dim: int, Target matrix col dimensionality |
| in_args: str, Distribution parameters in key=value pairs. |
| Supported parameters: |
| Normal: mu, sigma |
| Uniform: min, max |
| Bernoulli: lower, upper, prob |
| """ |
| _validate_output_table(matrix_out) |
| _assert(row_dim > 0 and col_dim > 0, |
| "Matrix error: Invalid dimensions for input matrices: {0} x {1}". |
| format(row_dim, col_dim)) |
| |
| out_args = parse_matrix_args( |
| out_args, in_default_args={'row': 'row', 'col': 'col', 'val': 'val'}) |
| |
| supported_dist = ['uniform', 'normal', 'bernoulli'] |
| if distribution: |
| try: |
| distribution = next(x for x in supported_dist |
| if x.startswith(distribution)) |
| except StopIteration: |
| # next() returns a StopIteration if no element found |
| plpy.error("Matrix Error: Invalid distribution: " |
| "{0}. Supported distributions: ({1})" |
| .format(distribution, ', '.join(sorted(supported_dist)))) |
| else: |
| distribution = 'uniform' |
| |
| in_args_default = {'seed': randint(0, 1000), 'temp_out': False} |
| in_args_types = {'seed': int, 'temp_out': bool} |
| if distribution == 'normal': |
| in_args_default.update({'mu': 0, 'sigma': 1}) |
| in_args_types.update({'mu': float, 'sigma': float}) |
| in_args_vals = extract_keyvalue_params(in_args, in_args_types, in_args_default) |
| random_vector = ("""{schema_madlib}.__normal_vector( |
| {col_dim}, |
| {in_args_vals[mu]}, |
| {in_args_vals[sigma]}, |
| {in_args_vals[seed]} + row) |
| """.format(**locals())) |
| elif distribution == 'bernoulli': |
| in_args_default.update({'upper': 1., 'lower': 0., 'prob': 0.5}) |
| in_args_types.update({'upper': float, 'lower': float, 'prob': float}) |
| in_args_vals = extract_keyvalue_params(in_args, in_args_types, in_args_default) |
| random_vector = ("""{schema_madlib}.__bernoulli_vector( |
| {col_dim}, |
| {in_args_vals[upper]}, |
| {in_args_vals[lower]}, |
| {in_args_vals[prob]}, |
| {in_args_vals[seed]} + row) |
| """.format(**locals())) |
| elif distribution == 'uniform': |
| in_args_default.update({'min': 0, 'max': 1}) |
| in_args_types.update({'min': float, 'max': float}) |
| in_args_vals = extract_keyvalue_params(in_args, in_args_types, in_args_default) |
| random_vector = ("""{schema_madlib}.__uniform_vector( |
| {col_dim}, |
| {in_args_vals[min]}, |
| {in_args_vals[max]}, |
| {in_args_vals[seed]} + row) |
| """.format(**locals())) |
| else: |
| plpy.error("Matrix Error: Invalid distribution: " |
| "{0}. Supported distributions: ({1})" |
| .format(distribution, ', '.join(sorted(supported_dist)))) |
| |
| plpy.execute(""" |
| CREATE {temp_str} TABLE {matrix_out} |
| m4_ifdef(`__POSTGRESQL__', `', |
| `WITH (APPENDONLY=TRUE)') AS |
| SELECT |
| row as {out_args[row]}, |
| {random_vector} as {out_args[val]} |
| FROM |
| generate_series(1, {row_dim}) as row |
| m4_ifdef(`__POSTGRESQL__', `', |
| `DISTRIBUTED BY ({out_args[row]})') |
| """.format(temp_str="TEMP" if in_args_vals['temp_out'] else "", |
| **locals())) |