| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Single Source Shortest Path |
| |
| # Please refer to the sssp.sql_in file for the documentation |
| |
| """ |
| @file sssp.py_in |
| |
| @namespace graph |
| """ |
| |
| import plpy |
| from graph_utils import * |
| from utilities.control import MinWarning |
| from utilities.utilities import _assert |
| from utilities.utilities import extract_keyvalue_params |
| from utilities.utilities import unique_string |
| from utilities.utilities import _string_to_array |
| from utilities.utilities import split_quoted_delimited_str |
| from utilities.validate_args import table_exists |
| from utilities.validate_args import columns_exist_in_table |
| from utilities.validate_args import table_is_empty |
| from utilities.validate_args import get_cols_and_types |
| from utilities.validate_args import get_expr_type |
| |
| m4_changequote(`<!', `!>') |
| |
| |
| def _check_groups(tbl1, tbl2, grp_list): |
| |
| """ |
| Helper function for joining tables with groups. |
| Args: |
| @param tbl1 Name of the first table |
| @param tbl2 Name of the second table |
| @param grp_list The list of grouping columns |
| """ |
| |
| return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals()) |
| for i in grp_list]) |
| |
| def _grp_from_table(tbl, grp_list): |
| |
| """ |
| Helper function for selecting grouping columns of a table |
| Args: |
| @param tbl Name of the table |
| @param grp_list The list of grouping columns |
| """ |
| return ' , '.join([" {tbl}.{i} ".format(**locals()) |
| for i in grp_list]) |
| |
| def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table, |
| edge_args, source_vertex, out_table, grouping_cols, **kwargs): |
| |
| """ |
| Single source shortest path function for graphs using the Bellman-Ford |
| algorhtm [1]. |
| Args: |
| @param vertex_table Name of the table that contains the vertex data. |
| @param vertex_id Name of the column containing the vertex ids. |
| @param edge_table Name of the table that contains the edge data. |
| @param edge_args A comma-delimited string containing multiple |
| named arguments of the form "name=value". |
| @param source_vertex The source vertex id for the algorithm to start. |
| @param out_table Name of the table to store the result of SSSP. |
| @param grouping_cols The list of grouping columns. |
| |
| [1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm |
| """ |
| |
| with MinWarning("warning"): |
| |
| INT_MAX = 2147483647 |
| INFINITY = "'Infinity'" |
| EPSILON = 0.000001 |
| |
| message = unique_string(desp='message') |
| |
| oldupdate = unique_string(desp='oldupdate') |
| newupdate = unique_string(desp='newupdate') |
| |
| params_types = {'src': str, 'dest': str, 'weight': str} |
| default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} |
| edge_params = extract_keyvalue_params(edge_args, |
| params_types, |
| default_args) |
| |
| # Prepare the input for recording in the summary table |
| if vertex_id is None: |
| v_st= "NULL" |
| vertex_id = "id" |
| else: |
| v_st = vertex_id |
| if edge_args is None: |
| e_st = "NULL" |
| else: |
| e_st = edge_args |
| if grouping_cols is None: |
| g_st = "NULL" |
| glist = None |
| else: |
| g_st = grouping_cols |
| glist = split_quoted_delimited_str(grouping_cols) |
| |
| src = edge_params["src"] |
| dest = edge_params["dest"] |
| weight = edge_params["weight"] |
| |
| distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, |
| <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) |
| local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, |
| <!"DISTRIBUTED BY (id)"!>) |
| |
| is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) |
| _validate_sssp(vertex_table, vertex_id, edge_table, |
| edge_params, source_vertex, out_table, glist) |
| |
| plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format( |
| message,oldupdate,newupdate)) |
| |
| # Initialize grouping related variables |
| comma_grp = "" |
| comma_grp_e = "" |
| comma_grp_m = "" |
| grp_comma = "" |
| checkg_oo = "" |
| checkg_eo = "" |
| checkg_ex = "" |
| checkg_om = "" |
| group_by = "" |
| |
| if grouping_cols is not None: |
| comma_grp = " , " + grouping_cols |
| group_by = " , " + _grp_from_table(edge_table,glist) |
| comma_grp_e = " , " + _grp_from_table(edge_table,glist) |
| comma_grp_m = " , " + _grp_from_table("message",glist) |
| grp_comma = grouping_cols + " , " |
| |
| checkg_oo_sub = _check_groups(out_table,"oldupdate",glist) |
| checkg_oo = " AND " + checkg_oo_sub |
| checkg_eo = " AND " + _check_groups(edge_table,"oldupdate",glist) |
| checkg_ex = " AND " + _check_groups(edge_table,"x",glist) |
| checkg_om = " AND " + _check_groups("out_table","message",glist) |
| |
| w_type = get_expr_type(weight,edge_table).lower() |
| init_w = INT_MAX |
| if w_type in ['double precision','float8']: |
| init_w = INFINITY |
| |
| # We keep a table of every vertex, the minimum cost to that destination |
| # seen so far and the parent to this vertex in the associated shortest |
| # path. This table will be updated throughout the execution. |
| plpy.execute( |
| """ CREATE TABLE {out_table} AS ( SELECT |
| {grp_comma} {src} AS {vertex_id}, {weight}, |
| {src} AS parent FROM {edge_table} LIMIT 0) |
| {distribution} """.format(**locals())) |
| |
| # We keep a summary table to keep track of the parameters used for this |
| # SSSP run. This table is used in the path finding function to eliminate |
| # the need for repetition. |
| plpy.execute( """ CREATE TABLE {out_table}_summary ( |
| vertex_table TEXT, |
| vertex_id TEXT, |
| edge_table TEXT, |
| edge_args TEXT, |
| source_vertex INTEGER, |
| out_table TEXT, |
| grouping_cols TEXT) |
| """.format(**locals())) |
| plpy.execute( """ INSERT INTO {out_table}_summary VALUES |
| ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}', |
| {source_vertex}, '{out_table}', '{g_st}') |
| """.format(**locals())) |
| |
| # We keep 2 update tables and alternate them during the execution. |
| # This is necessary since we need to know which vertices are updated in |
| # the previous iteration to calculate the next set of updates. |
| plpy.execute( |
| """ CREATE TEMP TABLE {oldupdate} AS ( SELECT |
| {src} AS id, {weight}, |
| {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0) |
| {local_distribution} |
| """.format(**locals())) |
| plpy.execute( |
| """ CREATE TEMP TABLE {newupdate} AS ( SELECT |
| {src} AS id, {weight}, |
| {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0) |
| {local_distribution} |
| """.format(**locals())) |
| |
| # Since HAWQ does not allow us to update, we create a new table and |
| # rename at every iteration. |
| if is_hawq: |
| temp_table = unique_string(desp='temp') |
| sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM {out_table} ) |
| {distribution} """ |
| plpy.execute(sql.format(**locals())) |
| |
| # GPDB and HAWQ have distributed by clauses to help them with indexing. |
| # For Postgres we add the indices manually. |
| sql_index = m4_ifdef(<!__POSTGRESQL__!>, |
| <!""" CREATE INDEX ON {out_table} ({vertex_id}); |
| CREATE INDEX ON {oldupdate} (id); |
| CREATE INDEX ON {newupdate} (id); |
| """.format(**locals())!>, |
| <!''!>) |
| plpy.execute(sql_index) |
| |
| # The initialization step is quite different when grouping is involved |
| # since not every group (subgraph) will have the same set of vertices. |
| |
| # Example: |
| # Assume there are two grouping columns g1 and g2 |
| # g1 values are 0 and 1. g2 values are 5 and 6 |
| if grouping_cols is not None: |
| |
| distinct_grp_table = unique_string(desp='grp') |
| plpy.execute(""" DROP TABLE IF EXISTS {distinct_grp_table} """. |
| format(**locals())) |
| plpy.execute( """ CREATE TEMP TABLE {distinct_grp_table} AS |
| SELECT DISTINCT {grouping_cols} FROM {edge_table} """. |
| format(**locals())) |
| subq = unique_string(desp='subquery') |
| |
| checkg_ds_sub = _check_groups(distinct_grp_table,subq,glist) |
| grp_d_comma = _grp_from_table(distinct_grp_table,glist) +"," |
| |
| plpy.execute( |
| """ INSERT INTO {out_table} |
| SELECT {grp_d_comma} {vertex_id} AS {vertex_id}, |
| {init_w} AS {weight}, NULL::INT AS parent |
| FROM {distinct_grp_table} INNER JOIN |
| ( |
| SELECT {src} AS {vertex_id} {comma_grp} |
| FROM {edge_table} |
| UNION |
| SELECT {dest} AS {vertex_id} {comma_grp} |
| FROM {edge_table} |
| ) {subq} ON ({checkg_ds_sub}) |
| WHERE {vertex_id} IS NOT NULL |
| """.format(**locals())) |
| |
| plpy.execute( |
| """ INSERT INTO {oldupdate} |
| SELECT {source_vertex}, 0, {source_vertex}, |
| {grouping_cols} |
| FROM {distinct_grp_table} |
| """.format(**locals())) |
| |
| # The maximum number of vertices for any group. |
| # Used for determining negative cycles. |
| v_cnt = plpy.execute( |
| """ SELECT max(count) as max FROM ( |
| SELECT count({vertex_id}) AS count |
| FROM {out_table} |
| GROUP BY {grouping_cols}) x |
| """.format(**locals()))[0]['max'] |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table)) |
| else: |
| plpy.execute( |
| """ INSERT INTO {out_table} |
| SELECT {vertex_id} AS {vertex_id}, |
| {init_w} AS {weight}, |
| NULL AS parent |
| FROM {vertex_table} |
| WHERE {vertex_id} IS NOT NULL |
| """.format(**locals())) |
| |
| # The source can be reached with 0 cost and it has itself as the |
| # parent. |
| plpy.execute( |
| """ INSERT INTO {oldupdate} |
| VALUES({source_vertex},0,{source_vertex}) |
| """.format(**locals())) |
| |
| v_cnt = plpy.execute( |
| """ SELECT count(*) FROM {vertex_table} |
| WHERE {vertex_id} IS NOT NULL |
| """.format(**locals()))[0]['count'] |
| |
| for i in range(0,v_cnt+1): |
| |
| # Apply the updates calculated in the last iteration. |
| if is_hawq: |
| sql = """ |
| TRUNCATE TABLE {temp_table}; |
| INSERT INTO {temp_table} |
| SELECT * |
| FROM {out_table} |
| WHERE NOT EXISTS ( |
| SELECT 1 |
| FROM {oldupdate} as oldupdate |
| WHERE {out_table}.{vertex_id} = oldupdate.id |
| {checkg_oo}) |
| UNION |
| SELECT {grp_comma} id, {weight}, parent FROM {oldupdate}; |
| """ |
| plpy.execute(sql.format(**locals())) |
| plpy.execute("DROP TABLE {0}".format(out_table)) |
| plpy.execute("ALTER TABLE {0} RENAME TO {1}". |
| format(temp_table,out_table)) |
| sql = """ CREATE TABLE {temp_table} AS ( |
| SELECT * FROM {out_table} LIMIT 0) |
| {distribution};""" |
| plpy.execute(sql.format(**locals())) |
| ret = plpy.execute("SELECT id FROM {0} LIMIT 1". |
| format(oldupdate)) |
| else: |
| sql = """ |
| UPDATE {out_table} SET |
| {weight}=oldupdate.{weight}, |
| parent=oldupdate.parent |
| FROM |
| {oldupdate} AS oldupdate |
| WHERE |
| {out_table}.{vertex_id}=oldupdate.id AND |
| {out_table}.{weight}>oldupdate.{weight} {checkg_oo} |
| """ |
| ret = plpy.execute(sql.format(**locals())) |
| |
| if ret.nrows() == 0: |
| break |
| |
| plpy.execute("TRUNCATE TABLE {0}".format(newupdate)) |
| |
| # 'oldupdate' table has the update info from the last iteration |
| |
| # Consider every edge that has an updated source |
| # From these edges: |
| # For every destination vertex, find the min total cost to reach. |
| # Note that, just calling an aggregate function with group by won't |
| # let us store the src field of the edge (needed for the parent). |
| # This is why we need the 'x'; it gives a list of destinations and |
| # associated min values. Using these values, we identify which edge |
| # is selected. |
| |
| # Since using '='' with floats is dangerous we use an epsilon value |
| # for comparison. |
| |
| # Once we have a list of edges and values (stores as 'message'), |
| # we check if these values are lower than the existing shortest |
| # path values. |
| |
| sql = (""" INSERT INTO {newupdate} |
| SELECT DISTINCT ON (message.id {comma_grp}) |
| message.id AS id, |
| message.{weight} AS {weight}, |
| message.parent AS parent {comma_grp_m} |
| FROM {out_table} AS out_table INNER JOIN |
| ( |
| SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight}, |
| oldupdate.id AS parent {comma_grp_e} |
| FROM {oldupdate} AS oldupdate INNER JOIN |
| {edge_table} ON |
| ({edge_table}.{src} = oldupdate.id {checkg_eo}) |
| INNER JOIN |
| ( |
| SELECT {edge_table}.{dest} AS id, |
| min(oldupdate.{weight} + |
| {edge_table}.{weight}) AS {weight} {comma_grp_e} |
| FROM {oldupdate} AS oldupdate INNER JOIN |
| {edge_table} ON |
| ({edge_table}.{src}=oldupdate.id {checkg_eo}) |
| GROUP BY {edge_table}.{dest} {comma_grp_e} |
| ) x |
| ON ({edge_table}.{dest} = x.id {checkg_ex} ) |
| WHERE ABS(oldupdate.{weight} + {edge_table}.{weight} |
| - x.{weight}) < {EPSILON} |
| ) message |
| ON (message.id = out_table.{vertex_id} {checkg_om}) |
| WHERE message.{weight}<out_table.{weight} |
| """.format(**locals())) |
| |
| plpy.execute(sql) |
| |
| # Swap the update tables for the next iteration. |
| tmp = oldupdate |
| oldupdate = newupdate |
| newupdate = tmp |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate)) |
| # The algorithm should converge in less than |V| iterations. |
| # Otherwise there is a negative cycle in the graph. |
| if i == v_cnt: |
| if grouping_cols is None: |
| plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}". |
| format(out_table, out_table+"_summary", oldupdate)) |
| if is_hawq: |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table)) |
| plpy.error("Graph SSSP: Detected a negative cycle in the graph.") |
| |
| # It is possible that not all groups has negative cycles. |
| else: |
| |
| # grp is the string created by collating grouping columns. |
| # By looking at the oldupdate table we can see which groups |
| # are in a negative cycle. |
| |
| negs = plpy.execute( |
| """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp |
| FROM {oldupdate} |
| """.format(**locals()))[0]['grp'] |
| |
| # Delete the groups with negative cycles from the output table. |
| if is_hawq: |
| sql_del = """ |
| TRUNCATE TABLE {temp_table}; |
| INSERT INTO {temp_table} |
| SELECT * |
| FROM {out_table} |
| WHERE NOT EXISTS( |
| SELECT 1 |
| FROM {oldupdate} as oldupdate |
| WHERE {checkg_oo_sub} |
| );""" |
| plpy.execute(sql_del.format(**locals())) |
| plpy.execute("DROP TABLE {0}".format(out_table)) |
| plpy.execute("ALTER TABLE {0} RENAME TO {1}". |
| format(temp_table,out_table)) |
| else: |
| sql_del = """ DELETE FROM {out_table} |
| USING {oldupdate} AS oldupdate |
| WHERE {checkg_oo_sub}""" |
| plpy.execute(sql_del.format(**locals())) |
| |
| # If every group has a negative cycle, |
| # drop the output table as well. |
| if table_is_empty(out_table): |
| plpy.execute("DROP TABLE IF EXISTS {0},{1}". |
| format(out_table,out_table+"_summary")) |
| |
| plpy.warning( |
| """Graph SSSP: Detected a negative cycle in the """ + |
| """sub-graphs of following groups: {0}.""". |
| format(str(negs)[1:-1])) |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate)) |
| if is_hawq: |
| plpy.execute("DROP TABLE IF EXISTS {temp_table} ". |
| format(**locals())) |
| |
| return None |
| |
| def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table, |
| **kwargs): |
| """ |
| Helper function that can be used to get the shortest path for a vertex |
| Args: |
| @param sssp_table Name of the table that contains the SSSP output. |
| @param dest_vertex The vertex that will be the destination of the |
| desired path. |
| @param path_table Name of the output table that contains the path. |
| """ |
| with MinWarning("warning"): |
| _validate_get_path(sssp_table, dest_vertex, path_table) |
| |
| temp1_name = unique_string(desp='temp1') |
| temp2_name = unique_string(desp='temp2') |
| |
| select_grps = "" |
| check_grps_t1 = "" |
| check_grps_t2 = "" |
| check_grps_pt1 = "" |
| check_grps_pt2 = "" |
| checkg_po = "" |
| grp_comma = "" |
| tmp = "" |
| |
| summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table)) |
| vertex_id = summary[0]['vertex_id'] |
| source_vertex = summary[0]['source_vertex'] |
| |
| if vertex_id == "NULL": |
| vertex_id = "id" |
| |
| grouping_cols = summary[0]['grouping_cols'] |
| if grouping_cols == "NULL": |
| grouping_cols = None |
| |
| if grouping_cols is not None: |
| glist = split_quoted_delimited_str(grouping_cols) |
| select_grps = _grp_from_table(sssp_table,glist) + " , " |
| check_grps_t1 = " AND " + _check_groups( |
| sssp_table,temp1_name,glist) |
| check_grps_t2 = " AND " + _check_groups( |
| sssp_table,temp2_name,glist) |
| |
| checkg_po = " WHERE " + _check_groups( |
| path_table,"oldupdate",glist) |
| grp_comma = grouping_cols + " , " |
| |
| if source_vertex == dest_vertex: |
| plpy.execute(""" |
| CREATE TABLE {path_table} AS |
| SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path |
| FROM {sssp_table} WHERE {vertex_id} = {dest_vertex} |
| """.format(**locals())) |
| return |
| |
| plpy.execute( "DROP TABLE IF EXISTS {0},{1}". |
| format(temp1_name,temp2_name)); |
| out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS |
| SELECT {grp_comma} {sssp_table}.parent AS {vertex_id}, |
| ARRAY[{dest_vertex}] AS path |
| FROM {sssp_table} |
| WHERE {vertex_id} = {dest_vertex} |
| AND {sssp_table}.parent IS NOT NULL |
| """.format(**locals())) |
| |
| plpy.execute(""" |
| CREATE TEMP TABLE {temp2_name} AS |
| SELECT * FROM {temp1_name} LIMIT 0 |
| """.format(**locals())) |
| |
| # Follow the 'parent' chain until you reach the source. |
| while out.nrows() > 0: |
| |
| plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals())) |
| # If the vertex id is not the source vertex, |
| # Add it to the path and move to its parent |
| out = plpy.execute( |
| """ INSERT INTO {temp2_name} |
| SELECT {select_grps} {sssp_table}.parent AS {vertex_id}, |
| {sssp_table}.{vertex_id} || {temp1_name}.path AS path |
| FROM {sssp_table} INNER JOIN {temp1_name} ON |
| ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id} |
| {check_grps_t1}) |
| WHERE {source_vertex} <> {sssp_table}.{vertex_id} |
| """.format(**locals())) |
| |
| tmp = temp2_name |
| temp2_name = temp1_name |
| temp1_name = tmp |
| |
| tmp = check_grps_t1 |
| check_grps_t1 = check_grps_t2 |
| check_grps_t2 = tmp |
| |
| # Add the source vertex to the beginning of every path and |
| # add the empty arrays for the groups that don't have a path to reach |
| # the destination vertex |
| plpy.execute(""" |
| CREATE TABLE {path_table} AS |
| SELECT {grp_comma} {source_vertex} || path AS path |
| FROM {temp2_name} |
| UNION |
| SELECT {grp_comma} '{{}}'::INT[] AS path |
| FROM {sssp_table} |
| WHERE {vertex_id} = {dest_vertex} |
| AND {sssp_table}.parent IS NULL |
| """.format(**locals())) |
| |
| out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table)) |
| |
| if out.nrows() == 0: |
| plpy.error( |
| "Graph SSSP: Vertex {0} is not present in the SSSP table {1}". |
| format(dest_vertex,sssp_table)) |
| |
| plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}". |
| format(**locals())) |
| |
| return None |
| |
| |
| def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params, |
| source_vertex, out_table, glist, **kwargs): |
| |
| validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, |
| out_table,'SSSP') |
| |
| _assert(isinstance(source_vertex,int), |
| """Graph SSSP: Source vertex {source_vertex} has to be an integer.""". |
| format(**locals())) |
| src_exists = plpy.execute(""" |
| SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex} |
| """.format(**locals())) |
| |
| if src_exists.nrows() == 0: |
| plpy.error( |
| """Graph SSSP: Source vertex {source_vertex} is not present in the vertex table {vertex_table}.""". |
| format(**locals())) |
| |
| vt_error = plpy.execute( |
| """ SELECT {vertex_id} |
| FROM {vertex_table} |
| WHERE {vertex_id} IS NOT NULL |
| GROUP BY {vertex_id} |
| HAVING count(*) > 1 """.format(**locals())) |
| |
| if vt_error.nrows() != 0: |
| plpy.error( |
| """Graph SSSP: Source vertex table {vertex_table} contains duplicate vertex id's.""". |
| format(**locals())) |
| |
| _assert(not table_exists(out_table+"_summary"), |
| "Graph SSSP: Output summary table already exists!") |
| |
| if glist is not None: |
| _assert(columns_exist_in_table(edge_table, glist), |
| """Graph SSSP: Not all columns from {glist} are present in edge table ({edge_table}).""". |
| format(**locals())) |
| |
| return None |
| |
| def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs): |
| |
| _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''), |
| "Graph SSSP: Invalid SSSP table name!") |
| _assert(table_exists(sssp_table), |
| "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table)) |
| _assert(not table_is_empty(sssp_table), |
| "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table)) |
| |
| summary = sssp_table+"_summary" |
| _assert(table_exists(summary), |
| "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary)) |
| _assert(not table_is_empty(summary), |
| "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary)) |
| |
| _assert(not table_exists(path_table), |
| "Graph SSSP: Output path table already exists!") |
| |
| return None |
| |
| def graph_sssp_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for graph_sssp and graph_sssp_get_path |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| |
| Given a graph and a source vertex, single source shortest path (SSSP) |
| algorithm finds a path for every vertex such that the the sum of the |
| weights of its constituent edges is minimized. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.graph_sssp('usage') |
| """ |
| elif message.lower() in ['usage', 'help', '?']: |
| help_string = """ |
| {graph_usage} |
| |
| To retrieve the path for a specific vertex: |
| |
| SELECT {schema_madlib}.graph_sssp_get_path( |
| sssp_table TEXT, -- Name of the table that contains the SSSP output. |
| dest_vertex INT, -- The vertex that will be the destination of the |
| -- desired path. |
| path_table TEXT -- Name of the output table that contains the path. |
| ); |
| |
| ---------------------------------------------------------------------------- |
| OUTPUT |
| ---------------------------------------------------------------------------- |
| The output of SSSP ('out_table' above) contains a row for every vertex of |
| every group and have the following columns (in addition to the grouping |
| columns): |
| - vertex_id : The id for the destination. Will use the input parameter |
| 'vertex_id' for column naming. |
| - weight : The total weight of the shortest path from the source vertex |
| to this particular vertex. |
| Will use the input parameter 'weight' for column naming. |
| - parent : The parent of this vertex in the shortest path from source. |
| Will use 'parent' for column naming. |
| |
| The output of graph_sssp_get_path ('path_table' above) contains a row for |
| every group and has the following columns: |
| - grouping_cols : The grouping columns given in the creation of the SSSP |
| table. If there are no grouping columns, these columns |
| will not exist and the table will have a single row. |
| - path (ARRAY) : The shortest path from the source vertex (as specified |
| in the SSSP execution) to the destination vertex. |
| """ |
| elif message.lower() in ("example", "examples"): |
| help_string = """ |
| ---------------------------------------------------------------------------- |
| EXAMPLES |
| ---------------------------------------------------------------------------- |
| -- Create a graph, represented as vertex and edge tables. |
| DROP TABLE IF EXISTS vertex,edge,out,out_summary,out_path; |
| CREATE TABLE vertex( |
| id INTEGER |
| ); |
| CREATE TABLE edge( |
| src INTEGER, |
| dest INTEGER, |
| weight DOUBLE PRECISION |
| ); |
| |
| INSERT INTO vertex VALUES |
| (0), |
| (1), |
| (2), |
| (3), |
| (4), |
| (5), |
| (6), |
| (7) |
| ; |
| INSERT INTO edge VALUES |
| (0, 1, 1), |
| (0, 2, 1), |
| (0, 4, 10), |
| (1, 2, 2), |
| (1, 3, 10), |
| (2, 3, 1), |
| (2, 5, 1), |
| (2, 6, 3), |
| (3, 0, 1), |
| (4, 0, -2), |
| (5, 6, 1), |
| (6, 7, 1) |
| ; |
| |
| -- Compute the SSSP: |
| DROP TABLE IF EXISTS pagerank_out; |
| SELECT madlib.graph_sssp( |
| 'vertex', -- Vertex table |
| 'id', -- Vertix id column |
| 'edge', -- Edge table |
| 'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments |
| 0, -- The source vertex |
| 'out' -- Output table of SSSP |
| ); |
| -- View the SSSP costs for every vertex: |
| SELECT * FROM out ORDER BY id; |
| |
| -- View the actual shortest path for a vertex: |
| SELECT graph_sssp_get_path('out',5,'out_path'); |
| SELECT * FROM out_path; |
| |
| -- Create a graph with 2 groups: |
| DROP TABLE IF EXISTS edge_gr; |
| CREATE TABLE edge_gr AS |
| ( |
| SELECT *, 0 AS grp FROM edge |
| UNION |
| SELECT *, 1 AS grp FROM edge WHERE src < 6 AND dest < 6 |
| ); |
| INSERT INTO edge_gr VALUES |
| (4,5,-20,1); |
| |
| -- Find SSSP for all groups: |
| DROP TABLE IF EXISTS out_gr, out_gr_summary; |
| SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp'); |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.graph_sssp()" |
| |
| return help_string.format(schema_madlib=schema_madlib, |
| graph_usage=get_graph_usage(schema_madlib, 'graph_sssp', |
| """source_vertex INT, -- The source vertex id for the algorithm to start. |
| out_table TEXT, -- Name of the table to store the result of SSSP. |
| grouping_cols TEXT -- The list of grouping columns.""")) |
| # --------------------------------------------------------------------- |