blob: a8339a8468c159b53360dc75f299317f2275547f [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Single Source Shortest Path
# Please refer to the sssp.sql_in file for the documentation
"""
@file sssp.py_in
@namespace graph
"""
import plpy
from graph_utils import *
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
from utilities.utilities import _string_to_array
from utilities.utilities import split_quoted_delimited_str
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import get_cols_and_types
from utilities.validate_args import get_expr_type
m4_changequote(`<!', `!>')
def _check_groups(tbl1, tbl2, grp_list):
"""
Helper function for joining tables with groups.
Args:
@param tbl1 Name of the first table
@param tbl2 Name of the second table
@param grp_list The list of grouping columns
"""
return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
for i in grp_list])
def _grp_from_table(tbl, grp_list):
"""
Helper function for selecting grouping columns of a table
Args:
@param tbl Name of the table
@param grp_list The list of grouping columns
"""
return ' , '.join([" {tbl}.{i} ".format(**locals())
for i in grp_list])
def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
edge_args, source_vertex, out_table, grouping_cols, **kwargs):
"""
Single source shortest path function for graphs using the Bellman-Ford
algorhtm [1].
Args:
@param vertex_table Name of the table that contains the vertex data.
@param vertex_id Name of the column containing the vertex ids.
@param edge_table Name of the table that contains the edge data.
@param edge_args A comma-delimited string containing multiple
named arguments of the form "name=value".
@param source_vertex The source vertex id for the algorithm to start.
@param out_table Name of the table to store the result of SSSP.
@param grouping_cols The list of grouping columns.
[1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
"""
with MinWarning("warning"):
INT_MAX = 2147483647
INFINITY = "'Infinity'"
EPSILON = 0.000001
message = unique_string(desp='message')
oldupdate = unique_string(desp='oldupdate')
newupdate = unique_string(desp='newupdate')
params_types = {'src': str, 'dest': str, 'weight': str}
default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
edge_params = extract_keyvalue_params(edge_args,
params_types,
default_args)
# Prepare the input for recording in the summary table
if vertex_id is None:
v_st= "NULL"
vertex_id = "id"
else:
v_st = vertex_id
if edge_args is None:
e_st = "NULL"
else:
e_st = edge_args
if grouping_cols is None:
g_st = "NULL"
glist = None
else:
g_st = grouping_cols
glist = split_quoted_delimited_str(grouping_cols)
src = edge_params["src"]
dest = edge_params["dest"]
weight = edge_params["weight"]
distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
<!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
<!"DISTRIBUTED BY (id)"!>)
is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
_validate_sssp(vertex_table, vertex_id, edge_table,
edge_params, source_vertex, out_table, glist)
plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
message,oldupdate,newupdate))
# Initialize grouping related variables
comma_grp = ""
comma_grp_e = ""
comma_grp_m = ""
grp_comma = ""
checkg_oo = ""
checkg_eo = ""
checkg_ex = ""
checkg_om = ""
group_by = ""
if grouping_cols is not None:
comma_grp = " , " + grouping_cols
group_by = " , " + _grp_from_table(edge_table,glist)
comma_grp_e = " , " + _grp_from_table(edge_table,glist)
comma_grp_m = " , " + _grp_from_table("message",glist)
grp_comma = grouping_cols + " , "
checkg_oo_sub = _check_groups(out_table,"oldupdate",glist)
checkg_oo = " AND " + checkg_oo_sub
checkg_eo = " AND " + _check_groups(edge_table,"oldupdate",glist)
checkg_ex = " AND " + _check_groups(edge_table,"x",glist)
checkg_om = " AND " + _check_groups("out_table","message",glist)
w_type = get_expr_type(weight,edge_table).lower()
init_w = INT_MAX
if w_type in ['double precision','float8']:
init_w = INFINITY
# We keep a table of every vertex, the minimum cost to that destination
# seen so far and the parent to this vertex in the associated shortest
# path. This table will be updated throughout the execution.
plpy.execute(
""" CREATE TABLE {out_table} AS ( SELECT
{grp_comma} {src} AS {vertex_id}, {weight},
{src} AS parent FROM {edge_table} LIMIT 0)
{distribution} """.format(**locals()))
# We keep a summary table to keep track of the parameters used for this
# SSSP run. This table is used in the path finding function to eliminate
# the need for repetition.
plpy.execute( """ CREATE TABLE {out_table}_summary (
vertex_table TEXT,
vertex_id TEXT,
edge_table TEXT,
edge_args TEXT,
source_vertex INTEGER,
out_table TEXT,
grouping_cols TEXT)
""".format(**locals()))
plpy.execute( """ INSERT INTO {out_table}_summary VALUES
('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
{source_vertex}, '{out_table}', '{g_st}')
""".format(**locals()))
# We keep 2 update tables and alternate them during the execution.
# This is necessary since we need to know which vertices are updated in
# the previous iteration to calculate the next set of updates.
plpy.execute(
""" CREATE TEMP TABLE {oldupdate} AS ( SELECT
{src} AS id, {weight},
{src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
{local_distribution}
""".format(**locals()))
plpy.execute(
""" CREATE TEMP TABLE {newupdate} AS ( SELECT
{src} AS id, {weight},
{src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
{local_distribution}
""".format(**locals()))
# Since HAWQ does not allow us to update, we create a new table and
# rename at every iteration.
if is_hawq:
temp_table = unique_string(desp='temp')
sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM {out_table} )
{distribution} """
plpy.execute(sql.format(**locals()))
# GPDB and HAWQ have distributed by clauses to help them with indexing.
# For Postgres we add the indices manually.
sql_index = m4_ifdef(<!__POSTGRESQL__!>,
<!""" CREATE INDEX ON {out_table} ({vertex_id});
CREATE INDEX ON {oldupdate} (id);
CREATE INDEX ON {newupdate} (id);
""".format(**locals())!>,
<!''!>)
plpy.execute(sql_index)
# The initialization step is quite different when grouping is involved
# since not every group (subgraph) will have the same set of vertices.
# Example:
# Assume there are two grouping columns g1 and g2
# g1 values are 0 and 1. g2 values are 5 and 6
if grouping_cols is not None:
distinct_grp_table = unique_string(desp='grp')
plpy.execute(""" DROP TABLE IF EXISTS {distinct_grp_table} """.
format(**locals()))
plpy.execute( """ CREATE TEMP TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_table} """.
format(**locals()))
subq = unique_string(desp='subquery')
checkg_ds_sub = _check_groups(distinct_grp_table,subq,glist)
grp_d_comma = _grp_from_table(distinct_grp_table,glist) +","
plpy.execute(
""" INSERT INTO {out_table}
SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
{init_w} AS {weight}, NULL::INT AS parent
FROM {distinct_grp_table} INNER JOIN
(
SELECT {src} AS {vertex_id} {comma_grp}
FROM {edge_table}
UNION
SELECT {dest} AS {vertex_id} {comma_grp}
FROM {edge_table}
) {subq} ON ({checkg_ds_sub})
WHERE {vertex_id} IS NOT NULL
""".format(**locals()))
plpy.execute(
""" INSERT INTO {oldupdate}
SELECT {source_vertex}, 0, {source_vertex},
{grouping_cols}
FROM {distinct_grp_table}
""".format(**locals()))
# The maximum number of vertices for any group.
# Used for determining negative cycles.
v_cnt = plpy.execute(
""" SELECT max(count) as max FROM (
SELECT count({vertex_id}) AS count
FROM {out_table}
GROUP BY {grouping_cols}) x
""".format(**locals()))[0]['max']
plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
else:
plpy.execute(
""" INSERT INTO {out_table}
SELECT {vertex_id} AS {vertex_id},
{init_w} AS {weight},
NULL AS parent
FROM {vertex_table}
WHERE {vertex_id} IS NOT NULL
""".format(**locals()))
# The source can be reached with 0 cost and it has itself as the
# parent.
plpy.execute(
""" INSERT INTO {oldupdate}
VALUES({source_vertex},0,{source_vertex})
""".format(**locals()))
v_cnt = plpy.execute(
""" SELECT count(*) FROM {vertex_table}
WHERE {vertex_id} IS NOT NULL
""".format(**locals()))[0]['count']
for i in range(0,v_cnt+1):
# Apply the updates calculated in the last iteration.
if is_hawq:
sql = """
TRUNCATE TABLE {temp_table};
INSERT INTO {temp_table}
SELECT *
FROM {out_table}
WHERE NOT EXISTS (
SELECT 1
FROM {oldupdate} as oldupdate
WHERE {out_table}.{vertex_id} = oldupdate.id
{checkg_oo})
UNION
SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
"""
plpy.execute(sql.format(**locals()))
plpy.execute("DROP TABLE {0}".format(out_table))
plpy.execute("ALTER TABLE {0} RENAME TO {1}".
format(temp_table,out_table))
sql = """ CREATE TABLE {temp_table} AS (
SELECT * FROM {out_table} LIMIT 0)
{distribution};"""
plpy.execute(sql.format(**locals()))
ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
format(oldupdate))
else:
sql = """
UPDATE {out_table} SET
{weight}=oldupdate.{weight},
parent=oldupdate.parent
FROM
{oldupdate} AS oldupdate
WHERE
{out_table}.{vertex_id}=oldupdate.id AND
{out_table}.{weight}>oldupdate.{weight} {checkg_oo}
"""
ret = plpy.execute(sql.format(**locals()))
if ret.nrows() == 0:
break
plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
# 'oldupdate' table has the update info from the last iteration
# Consider every edge that has an updated source
# From these edges:
# For every destination vertex, find the min total cost to reach.
# Note that, just calling an aggregate function with group by won't
# let us store the src field of the edge (needed for the parent).
# This is why we need the 'x'; it gives a list of destinations and
# associated min values. Using these values, we identify which edge
# is selected.
# Since using '='' with floats is dangerous we use an epsilon value
# for comparison.
# Once we have a list of edges and values (stores as 'message'),
# we check if these values are lower than the existing shortest
# path values.
sql = (""" INSERT INTO {newupdate}
SELECT DISTINCT ON (message.id {comma_grp})
message.id AS id,
message.{weight} AS {weight},
message.parent AS parent {comma_grp_m}
FROM {out_table} AS out_table INNER JOIN
(
SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
oldupdate.id AS parent {comma_grp_e}
FROM {oldupdate} AS oldupdate INNER JOIN
{edge_table} ON
({edge_table}.{src} = oldupdate.id {checkg_eo})
INNER JOIN
(
SELECT {edge_table}.{dest} AS id,
min(oldupdate.{weight} +
{edge_table}.{weight}) AS {weight} {comma_grp_e}
FROM {oldupdate} AS oldupdate INNER JOIN
{edge_table} ON
({edge_table}.{src}=oldupdate.id {checkg_eo})
GROUP BY {edge_table}.{dest} {comma_grp_e}
) x
ON ({edge_table}.{dest} = x.id {checkg_ex} )
WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
- x.{weight}) < {EPSILON}
) message
ON (message.id = out_table.{vertex_id} {checkg_om})
WHERE message.{weight}<out_table.{weight}
""".format(**locals()))
plpy.execute(sql)
# Swap the update tables for the next iteration.
tmp = oldupdate
oldupdate = newupdate
newupdate = tmp
plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
# The algorithm should converge in less than |V| iterations.
# Otherwise there is a negative cycle in the graph.
if i == v_cnt:
if grouping_cols is None:
plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
format(out_table, out_table+"_summary", oldupdate))
if is_hawq:
plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
# It is possible that not all groups has negative cycles.
else:
# grp is the string created by collating grouping columns.
# By looking at the oldupdate table we can see which groups
# are in a negative cycle.
negs = plpy.execute(
""" SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
FROM {oldupdate}
""".format(**locals()))[0]['grp']
# Delete the groups with negative cycles from the output table.
if is_hawq:
sql_del = """
TRUNCATE TABLE {temp_table};
INSERT INTO {temp_table}
SELECT *
FROM {out_table}
WHERE NOT EXISTS(
SELECT 1
FROM {oldupdate} as oldupdate
WHERE {checkg_oo_sub}
);"""
plpy.execute(sql_del.format(**locals()))
plpy.execute("DROP TABLE {0}".format(out_table))
plpy.execute("ALTER TABLE {0} RENAME TO {1}".
format(temp_table,out_table))
else:
sql_del = """ DELETE FROM {out_table}
USING {oldupdate} AS oldupdate
WHERE {checkg_oo_sub}"""
plpy.execute(sql_del.format(**locals()))
# If every group has a negative cycle,
# drop the output table as well.
if table_is_empty(out_table):
plpy.execute("DROP TABLE IF EXISTS {0},{1}".
format(out_table,out_table+"_summary"))
plpy.warning(
"""Graph SSSP: Detected a negative cycle in the """ +
"""sub-graphs of following groups: {0}.""".
format(str(negs)[1:-1]))
plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
if is_hawq:
plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
format(**locals()))
return None
def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
**kwargs):
"""
Helper function that can be used to get the shortest path for a vertex
Args:
@param sssp_table Name of the table that contains the SSSP output.
@param dest_vertex The vertex that will be the destination of the
desired path.
@param path_table Name of the output table that contains the path.
"""
with MinWarning("warning"):
_validate_get_path(sssp_table, dest_vertex, path_table)
temp1_name = unique_string(desp='temp1')
temp2_name = unique_string(desp='temp2')
select_grps = ""
check_grps_t1 = ""
check_grps_t2 = ""
check_grps_pt1 = ""
check_grps_pt2 = ""
checkg_po = ""
grp_comma = ""
tmp = ""
summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
vertex_id = summary[0]['vertex_id']
source_vertex = summary[0]['source_vertex']
if vertex_id == "NULL":
vertex_id = "id"
grouping_cols = summary[0]['grouping_cols']
if grouping_cols == "NULL":
grouping_cols = None
if grouping_cols is not None:
glist = split_quoted_delimited_str(grouping_cols)
select_grps = _grp_from_table(sssp_table,glist) + " , "
check_grps_t1 = " AND " + _check_groups(
sssp_table,temp1_name,glist)
check_grps_t2 = " AND " + _check_groups(
sssp_table,temp2_name,glist)
checkg_po = " WHERE " + _check_groups(
path_table,"oldupdate",glist)
grp_comma = grouping_cols + " , "
if source_vertex == dest_vertex:
plpy.execute("""
CREATE TABLE {path_table} AS
SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
""".format(**locals()))
return
plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
format(temp1_name,temp2_name));
out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
ARRAY[{dest_vertex}] AS path
FROM {sssp_table}
WHERE {vertex_id} = {dest_vertex}
AND {sssp_table}.parent IS NOT NULL
""".format(**locals()))
plpy.execute("""
CREATE TEMP TABLE {temp2_name} AS
SELECT * FROM {temp1_name} LIMIT 0
""".format(**locals()))
# Follow the 'parent' chain until you reach the source.
while out.nrows() > 0:
plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
# If the vertex id is not the source vertex,
# Add it to the path and move to its parent
out = plpy.execute(
""" INSERT INTO {temp2_name}
SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
{sssp_table}.{vertex_id} || {temp1_name}.path AS path
FROM {sssp_table} INNER JOIN {temp1_name} ON
({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
{check_grps_t1})
WHERE {source_vertex} <> {sssp_table}.{vertex_id}
""".format(**locals()))
tmp = temp2_name
temp2_name = temp1_name
temp1_name = tmp
tmp = check_grps_t1
check_grps_t1 = check_grps_t2
check_grps_t2 = tmp
# Add the source vertex to the beginning of every path and
# add the empty arrays for the groups that don't have a path to reach
# the destination vertex
plpy.execute("""
CREATE TABLE {path_table} AS
SELECT {grp_comma} {source_vertex} || path AS path
FROM {temp2_name}
UNION
SELECT {grp_comma} '{{}}'::INT[] AS path
FROM {sssp_table}
WHERE {vertex_id} = {dest_vertex}
AND {sssp_table}.parent IS NULL
""".format(**locals()))
out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
if out.nrows() == 0:
plpy.error(
"Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
format(dest_vertex,sssp_table))
plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
format(**locals()))
return None
def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
source_vertex, out_table, glist, **kwargs):
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
out_table,'SSSP')
_assert(isinstance(source_vertex,int),
"""Graph SSSP: Source vertex {source_vertex} has to be an integer.""".
format(**locals()))
src_exists = plpy.execute("""
SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
""".format(**locals()))
if src_exists.nrows() == 0:
plpy.error(
"""Graph SSSP: Source vertex {source_vertex} is not present in the vertex table {vertex_table}.""".
format(**locals()))
vt_error = plpy.execute(
""" SELECT {vertex_id}
FROM {vertex_table}
WHERE {vertex_id} IS NOT NULL
GROUP BY {vertex_id}
HAVING count(*) > 1 """.format(**locals()))
if vt_error.nrows() != 0:
plpy.error(
"""Graph SSSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
format(**locals()))
_assert(not table_exists(out_table+"_summary"),
"Graph SSSP: Output summary table already exists!")
if glist is not None:
_assert(columns_exist_in_table(edge_table, glist),
"""Graph SSSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
format(**locals()))
return None
def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs):
_assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
"Graph SSSP: Invalid SSSP table name!")
_assert(table_exists(sssp_table),
"Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
_assert(not table_is_empty(sssp_table),
"Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
summary = sssp_table+"_summary"
_assert(table_exists(summary),
"Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
_assert(not table_is_empty(summary),
"Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
_assert(not table_exists(path_table),
"Graph SSSP: Output path table already exists!")
return None
def graph_sssp_help(schema_madlib, message, **kwargs):
"""
Help function for graph_sssp and graph_sssp_get_path
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if not message:
help_string = """
-----------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------
Given a graph and a source vertex, single source shortest path (SSSP)
algorithm finds a path for every vertex such that the the sum of the
weights of its constituent edges is minimized.
For more details on function usage:
SELECT {schema_madlib}.graph_sssp('usage')
"""
elif message.lower() in ['usage', 'help', '?']:
help_string = """
{graph_usage}
To retrieve the path for a specific vertex:
SELECT {schema_madlib}.graph_sssp_get_path(
sssp_table TEXT, -- Name of the table that contains the SSSP output.
dest_vertex INT, -- The vertex that will be the destination of the
-- desired path.
path_table TEXT -- Name of the output table that contains the path.
);
----------------------------------------------------------------------------
OUTPUT
----------------------------------------------------------------------------
The output of SSSP ('out_table' above) contains a row for every vertex of
every group and have the following columns (in addition to the grouping
columns):
- vertex_id : The id for the destination. Will use the input parameter
'vertex_id' for column naming.
- weight : The total weight of the shortest path from the source vertex
to this particular vertex.
Will use the input parameter 'weight' for column naming.
- parent : The parent of this vertex in the shortest path from source.
Will use 'parent' for column naming.
The output of graph_sssp_get_path ('path_table' above) contains a row for
every group and has the following columns:
- grouping_cols : The grouping columns given in the creation of the SSSP
table. If there are no grouping columns, these columns
will not exist and the table will have a single row.
- path (ARRAY) : The shortest path from the source vertex (as specified
in the SSSP execution) to the destination vertex.
"""
elif message.lower() in ("example", "examples"):
help_string = """
----------------------------------------------------------------------------
EXAMPLES
----------------------------------------------------------------------------
-- Create a graph, represented as vertex and edge tables.
DROP TABLE IF EXISTS vertex,edge,out,out_summary,out_path;
CREATE TABLE vertex(
id INTEGER
);
CREATE TABLE edge(
src INTEGER,
dest INTEGER,
weight DOUBLE PRECISION
);
INSERT INTO vertex VALUES
(0),
(1),
(2),
(3),
(4),
(5),
(6),
(7)
;
INSERT INTO edge VALUES
(0, 1, 1),
(0, 2, 1),
(0, 4, 10),
(1, 2, 2),
(1, 3, 10),
(2, 3, 1),
(2, 5, 1),
(2, 6, 3),
(3, 0, 1),
(4, 0, -2),
(5, 6, 1),
(6, 7, 1)
;
-- Compute the SSSP:
DROP TABLE IF EXISTS pagerank_out;
SELECT madlib.graph_sssp(
'vertex', -- Vertex table
'id', -- Vertix id column
'edge', -- Edge table
'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments
0, -- The source vertex
'out' -- Output table of SSSP
);
-- View the SSSP costs for every vertex:
SELECT * FROM out ORDER BY id;
-- View the actual shortest path for a vertex:
SELECT graph_sssp_get_path('out',5,'out_path');
SELECT * FROM out_path;
-- Create a graph with 2 groups:
DROP TABLE IF EXISTS edge_gr;
CREATE TABLE edge_gr AS
(
SELECT *, 0 AS grp FROM edge
UNION
SELECT *, 1 AS grp FROM edge WHERE src < 6 AND dest < 6
);
INSERT INTO edge_gr VALUES
(4,5,-20,1);
-- Find SSSP for all groups:
DROP TABLE IF EXISTS out_gr, out_gr_summary;
SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
"""
else:
help_string = "No such option. Use {schema_madlib}.graph_sssp()"
return help_string.format(schema_madlib=schema_madlib,
graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
"""source_vertex INT, -- The source vertex id for the algorithm to start.
out_table TEXT, -- Name of the table to store the result of SSSP.
grouping_cols TEXT -- The list of grouping columns."""))
# ---------------------------------------------------------------------