blob: e854ab9ee6554db7cdc1d833fdf770897337c259 [file] [log] [blame]
# coding=utf-8
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# All Pairs Shortest Path
# Please refer to the apsp.sql_in file for the documentation
@file apsp.py_in
@namespace graph
import plpy
from graph_utils import validate_graph_coding
from graph_utils import get_graph_usage
from graph_utils import get_edge_params
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
from utilities.utilities import split_quoted_delimited_str
from utilities.utilities import is_platform_pg
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import get_expr_type
def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
edge_args, out_table, grouping_cols, **kwargs):
All Pairs shortest path function for graphs using the matrix
multiplication based algorithm [1].
@param vertex_table Name of the table that contains the vertex data.
@param vertex_id Name of the column containing the vertex ids.
@param edge_table Name of the table that contains the edge data.
@param edge_args A comma-delimited string containing multiple
named arguments of the form "name=value".
@param out_table Name of the table to store the result of APSP.
@param grouping_cols The list of grouping columns.
with MinWarning("warning"):
INT_MAX = 2147483647
INFINITY = "'Infinity'"
EPSILON = 0.000001
params_types = {'src': str, 'dest': str, 'weight': str}
default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
# Prepare the input for recording in the summary table
if not vertex_id:
v_st = ''
vertex_id = "id"
v_st = vertex_id
if not edge_args:
e_st = ''
e_st = edge_args
if not grouping_cols:
g_st = ''
glist = None
g_st = grouping_cols
glist = split_quoted_delimited_str(grouping_cols)
src = edge_params["src"]
dest = edge_params["dest"]
weight = edge_params["weight"]
distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(src)
_validate_apsp(vertex_table, vertex_id, edge_table,
edge_params, out_table, glist)
out_table_1 = unique_string(desp='out_table_1')
out_table_2 = unique_string(desp='out_table_2')
tmp_view = unique_string(desp='tmp_view')
v1 = unique_string(desp='v1')
v2 = unique_string(desp='v2')
message = unique_string(desp='message')
# Initialize grouping related variables
comma_grp = ""
comma_grp_e = ""
comma_grp_m = ""
grp_comma = ""
grp_v1_comma = ""
grp_o1_comma = ""
grp_o_comma = ""
checkg_eo = ""
checkg_eout = ""
checkg_ex = ""
checkg_om = ""
checkg_o1t_sub = ""
checkg_ot_sub = ""
checkg_ot = ""
checkg_o1t = ""
checkg_vv = ""
checkg_o2v = ""
checkg_oy = ""
checkg_vv_sub = "TRUE"
grp_by = ""
if grouping_cols:
# We use actual table names in some cases and aliases in others
# In some cases, we swap the table names so use of an alias is
# necessary. In other cases, they are used to simplify debugging.
comma_grp = " , " + grouping_cols
comma_grp_e = " , " + get_table_qualified_col_str("edge", glist)
comma_grp_m = " , " + get_table_qualified_col_str(message, glist)
grp_comma = grouping_cols + " , "
grp_v1_comma = get_table_qualified_col_str("v1", glist) + " , "
grp_o1_comma = get_table_qualified_col_str(out_table_1, glist) + " , "
grp_o_comma = get_table_qualified_col_str("out", glist) + " , "
checkg_eo = " AND " + _check_groups(edge_table, out_table, glist)
checkg_eout = " AND " + _check_groups("edge", "out", glist)
checkg_ex = " AND " + _check_groups("edge", "x", glist)
checkg_om = " AND " + _check_groups(out_table, message, glist)
checkg_o1t_sub = _check_groups("out", tmp_view, glist)
checkg_ot_sub = _check_groups(out_table, tmp_view, glist)
checkg_ot = " AND " + _check_groups(out_table, tmp_view, glist)
checkg_o1t = " AND " + _check_groups("out", "t", glist)
checkg_vv = " AND " + _check_groups("v1", "v2", glist)
checkg_o2v = " AND " + _check_groups(out_table_2, "v2", glist)
checkg_oy = " AND " + _check_groups("out", "y", glist)
checkg_vv_sub = _check_groups("v1", "v2", glist)
grp_by = " GROUP BY " + grouping_cols
w_type = get_expr_type(weight, edge_table).lower()
init_w = INT_MAX
if w_type in ['real', 'double precision', 'float8']:
init_w = INFINITY
# We keep a summary table to keep track of the parameters used for this
# APSP run. This table is used in the path finding function to eliminate
# the need for repetition.
summary_table = add_postfix(out_table, "_summary")
plpy.execute(""" CREATE TABLE {summary_table} (
vertex_table TEXT,
vertex_id TEXT,
edge_table TEXT,
edge_args TEXT,
out_table TEXT,
grouping_cols TEXT)
plpy.execute(""" INSERT INTO {summary_table} VALUES
('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
'{out_table}', '{g_st}') """.format(**locals()))
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
# Find all of the vertices involved with a given group
plpy.execute(""" CREATE VIEW {tmp_view} AS
SELECT {src} AS {vertex_id} {comma_grp}
FROM {edge_table} WHERE {src} IS NOT NULL
SELECT {dest} AS {vertex_id} {comma_grp}
FROM {edge_table} WHERE {dest} IS NOT NULL
# Don't use the unnecessary rows of the output table during joins.
ot_sql = """ SELECT * FROM {out_table}
WHERE {weight} != {init_w} AND {src} != {dest} """
plpy.execute(""" CREATE TABLE {out_table} AS
(SELECT {grp_comma} {src}, {dest}, {weight},
{src} AS parent FROM {edge_table} LIMIT 0)
{distribution} """.format(**locals()))
plpy.execute(""" INSERT INTO {out_table}
SELECT {grp_v1_comma}
v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
{init_w} AS {weight}, NULL::INT AS parent
{tmp_view} AS v1 INNER JOIN
{tmp_view} AS v2 ON ({checkg_vv_sub})
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
# GPDB has distributed by clauses to help them with indexing.
# For Postgres we add the indices manually.
if is_platform_pg():
sql_index = "CREATE INDEX ON {0}({1})".format(out_table, src)
sql_index = ''
# The source can be reached with 0 cost and next is itself.
""" UPDATE {out_table} SET
{weight} = 0, parent = {vertex_table}.{vertex_id}
FROM {vertex_table}
WHERE {out_table}.{src} = {vertex_table}.{vertex_id}
AND {out_table}.{dest} = {vertex_table}.{vertex_id}
# Distance = 1: every edge means there is a path from src to dest
# There may be multiple edges defined as a->b,
# we only need the minimum weighted one.
""" CREATE VIEW {tmp_view} AS
SELECT {grp_comma} {src}, {dest},
min({weight}) AS {weight}
FROM {edge_table}
GROUP BY {grp_comma} {src}, {dest}
""" UPDATE {out_table} SET
{weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
FROM {tmp_view}
WHERE {out_table}.{src} = {tmp_view}.{src}
AND {out_table}.{dest} = {tmp_view}.{dest}
AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
ot_sql1 = ot_sql.format(**locals())
out_table_1 = out_table
# Find the maximum number of iterations to try
# If not done by v_cnt iterations, there is a negative cycle.
v_cnt = plpy.execute(
""" SELECT max(count) as max FROM (
SELECT count(DISTINCT {src}) AS count
FROM {out_table_1}
{grp_by}) x
if v_cnt < 2:
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
plpy.execute("DROP TABLE IF EXISTS {0},{1}".
format(out_table, summary_table))
if grouping_cols:
plpy.error(("Graph APSP: {0} has less than 2 vertices in " +
"every group.").format(edge_table))
plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
for i in range(0, v_cnt + 1):
Create a view that will be used to update the output table
The implementation is based on the matrix multiplication idea.
The initial output table consists of 3 sets of vertex pairs.
1) for every vervex v: v -> v, weight 0, parent v
2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
parent NULL
The algorithm "relaxes" the paths: finds alternate paths with less
At every step, we look at every combination of non-infinite
existing paths and edges to see if we can relax a path.
Assume the graph is a chain: 1->2->3->...
The initial output table will have a finite weighted path for 1->2
and infinite for the rest. At ith iteration, the output table will
have 1->i path and will relax 1->i+1 path from infinite to a
finite value (weight of 1->i path + weight of i->i+1 edge) and
assign i as the parent.
Since using '=' with floats is dangerous we use an epsilon value
for comparison.
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
update_sql = """ CREATE VIEW {tmp_view} AS
SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
{grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
FROM {out_table_1} AS out
(SELECT x.{src}, x.{dest}, x.{weight},
out.{dest} as parent {comma_grp_e}
({ot_sql1}) AS out
{edge_table} AS edge
ON (out.{dest} = edge.{src} {checkg_eout})
(SELECT out.{src}, edge.{dest},
min(out.{weight}+edge.{weight}) AS {weight}
({ot_sql1}) AS out,
{edge_table} AS edge
WHERE out.{dest} = edge.{src} {checkg_eout}
GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} {checkg_ex})
WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
< {EPSILON}) y
ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
WHERE y.{weight} < out.{weight}
updates = plpy.execute(
""" UPDATE {out_table}
SET {weight} = {tmp_view}.{weight},
parent = {tmp_view}.parent
FROM {tmp_view}
WHERE {tmp_view}.{src} = {out_table}.{src} AND
{tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
if updates.nrows() == 0:
# The algorithm should have reached a break command by this point.
# This check handles the existence of a negative cycle.
if i == v_cnt:
# If there are no groups, clean up and give error.
if not grouping_cols:
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
plpy.execute("DROP TABLE IF EXISTS {0},{1}".
format(out_table, summary_table))
plpy.error("Graph APSP: Detected a negative cycle in the graph.")
# It is possible that not all groups has negative cycles.
# negs is the string created by collating grouping columns.
# By looking at the update view, we can see which groups
# are in a negative cycle.
negs = plpy.execute(
""" SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
FROM {tmp_view}
# Delete the groups with negative cycles from the output table.
sql_del = """ DELETE FROM {out_table}
USING {tmp_view}
WHERE {checkg_ot_sub}"""
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
# If every group has a negative cycle,
# drop the output table as well.
if table_is_empty(out_table):
plpy.execute("DROP TABLE IF EXISTS {0},{1}".
format(out_table, summary_table))
"""Graph APSP: Detected a negative cycle in the """ +
"""sub-graphs of following groups: {0}.""".
plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
# Filter out the infinite paths (disconnected pairs)
plpy.execute(""" DELETE FROM {0} WHERE parent IS NULL
return None
def graph_apsp_get_path(schema_madlib, apsp_table,
source_vertex, dest_vertex, path_table, **kwargs):
Helper function that can be used to get the shortest path between any 2
@param apsp_table Name of the table that contains the APSP
@param source_vertex The vertex that will be the source of the
desired path.
@param dest_vertex The vertex that will be the destination of the
desired path.
with MinWarning("warning"):
_validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
temp1_name = unique_string(desp='temp1')
temp2_name = unique_string(desp='temp2')
summary_table = add_postfix(apsp_table, "_summary")
summary = plpy.execute("SELECT * FROM {0}".format(summary_table))
vertex_id = summary[0]['vertex_id']
edge_args = summary[0]['edge_args']
grouping_cols = summary[0]['grouping_cols']
edge_params, final_type = get_edge_params(schema_madlib, apsp_table, edge_args)
src = edge_params["src"]
dest = edge_params["dest"]
weight = edge_params["weight"]
if not vertex_id:
vertex_id = "id"
if not grouping_cols:
grouping_cols = None
select_grps = ""
check_grps_t1 = ""
check_grps_t2 = ""
grp_comma = ""
tmp = ""
if grouping_cols:
glist = split_quoted_delimited_str(grouping_cols)
select_grps = get_table_qualified_col_str(apsp_table, glist) + " , "
check_grps_t1 = " AND " + _check_groups(
apsp_table, temp1_name, glist)
check_grps_t2 = " AND " + _check_groups(
apsp_table, temp2_name, glist)
grp_comma = grouping_cols + " , "
# If the source and destination is the same vertex.
# There is no need to check the paths for any group.
if source_vertex == dest_vertex:
CREATE TABLE {path_table} AS
SELECT {grp_comma} '{{{dest_vertex}}}'::{final_type}[] AS path
FROM {apsp_table} WHERE {src} = {source_vertex} AND
{dest} = {dest_vertex}
plpy.execute("DROP TABLE IF EXISTS {0},{1}".
format(temp1_name, temp2_name))
# Initialize the temporary tables
out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
ARRAY[{dest_vertex}]::{final_type}[] AS path
FROM {apsp_table}
WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
AND {apsp_table}.parent IS NOT NULL
SELECT * FROM {temp1_name} LIMIT 0
# Follow the 'parent' chain until you reach the case where the parent
# is the same as destination. This means it is the last vertex before
# the source.
while out.nrows() > 0:
plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
# If the parent id is not the same as dest,
# that means we have to follow the chain:
# Add it to the path and move to its parent
out = plpy.execute(
""" INSERT INTO {temp2_name}
SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
{apsp_table}.{dest} || {temp1_name}.path AS path
FROM {apsp_table} INNER JOIN {temp1_name} ON
({apsp_table}.{dest} = {temp1_name}.{vertex_id}
WHERE {src} = {source_vertex} AND
{apsp_table}.parent <> {apsp_table}.{dest}
tmp = temp2_name
temp2_name = temp1_name
temp1_name = tmp
tmp = check_grps_t1
check_grps_t1 = check_grps_t2
check_grps_t2 = tmp
# We have to consider 3 cases.
# 1) The path has more than 2 vertices:
# Add the current parent and the source vertex
# 2) The path has exactly 2 vertices (an edge between src and dest is
# the shortest path).
# Add the source vertex
# 3) The path has 0 vertices (unreachable)
# Add an empty array.
# Path with 1 vertex (src == dest) has been handled before
CREATE TABLE {path_table} AS
SELECT {grp_comma} {source_vertex}::{final_type} || ({vertex_id} || path) AS path
FROM {temp2_name}
WHERE {vertex_id} <> {dest_vertex}
SELECT {grp_comma} {source_vertex}::{final_type} || path AS path
FROM {temp2_name}
WHERE {vertex_id} = {dest_vertex}
SELECT {grp_comma} '{{}}'::{final_type}[] AS path
FROM {apsp_table}
WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
AND {apsp_table}.parent IS NULL
out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
if out.nrows() == 0:
plpy.error(("Graph APSP: Vertex {0} and/or {1} is not present" +
" in the APSP table {1}").format(
source_vertex, dest_vertex, apsp_table))
plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
return None
def _validate_apsp(vertex_table, vertex_id, edge_table, edge_params,
out_table, glist, **kwargs):
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
out_table, 'APSP')
vt_error = plpy.execute(
""" SELECT {vertex_id}
FROM {vertex_table}
WHERE {vertex_id} IS NOT NULL
GROUP BY {vertex_id}
HAVING count(*) > 1 """.format(**locals()))
if vt_error.nrows() != 0:
"""Graph APSP: Source vertex table {vertex_table} contains duplicate vertex id's.""".
summary_table = add_postfix(out_table, "_summary")
_assert(not table_exists(summary_table),
"Graph APSP: Output summary table already exists!")
if glist is not None:
_assert(columns_exist_in_table(edge_table, glist),
"""Graph APSP: Not all columns from {glist} are present in edge table ({edge_table}).""".
def _validate_get_path(apsp_table, source_vertex, dest_vertex,
path_table, **kwargs):
_assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
"Graph APSP: Invalid APSP table name!")
"Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
_assert(not table_is_empty(apsp_table),
"Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
summary_table = add_postfix(apsp_table, "_summary")
"Graph APSP: APSP summary table ({0}) is missing!".format(summary_table))
_assert(not table_is_empty(summary_table),
"Graph APSP: APSP summary table ({0}) is empty!".format(summary_table))
_assert(not table_exists(path_table),
"Graph APSP: Output path table already exists!")
return None
def graph_apsp_help(schema_madlib, message, **kwargs):
Help function for graph_apsp and graph_apsp_get_path
@param schema_madlib
@param message: string, Help message string
@param kwargs
String. Help/usage information
if not message:
help_string = """
Given a graph, all pairs shortest path (APSP) algorithm finds a path for
every vertex pair such that the sum of the weights of its constituent
edges is minimized.
For more details on function usage:
SELECT {schema_madlib}.graph_apsp('usage')
elif message.lower() in ['usage', 'help', '?']:
help_string = """
Given a graph, all pairs shortest path (apsp) algorithm finds a path for
every vertex pair such that the sum of the weights of its constituent
edges is minimized.
To retrieve the path for a specific vertex pair:
SELECT {schema_madlib}.graph_apsp_get_path(
apsp_table TEXT, -- Name of the table that contains the apsp output.
source_vertex INT, -- The vertex that will be the source of the
-- desired path.
dest_vertex INT, -- The vertex that will be the destination of the
-- desired path.
path_table TEXT -- Name of the output table that contains the path.
The output of apsp ('out_table' above) contains a row for every vertex of
every group and has the following columns (in addition to the grouping
- source_vertex : The id for the source vertex. Will use the input edge
column 'src' for column naming.
- dest_vertex : The id for the destination vertex. Will use the input edge
column 'dest' for column naming.
- weight : The total weight of the shortest path from the source
vertex to this particular vertex.
Will use the input parameter 'weight' for column naming.
- parent : The parent of the destination vertex in the shortest path
from source. praent will be equal to dest_vertex is there
are no more intermediary vertices. Will use 'parent' for
column naming.
The output of graph_apsp_get_path ('path_table' above) contains a row for
every group and has the following columns:
- grouping_cols : The grouping columns given in the creation of the apsp
table. If there are no grouping columns, these columns
will not exist and the table will have a single row.
- path (ARRAY) : The shortest path from the source vertex to the
destination vertex.
help_string = "No such option. Use {schema_madlib}.graph_apsp()"
return help_string.format(schema_madlib=schema_madlib,
graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
"""out_table TEXT, -- Name of the table to store the result of apsp.
grouping_cols TEXT -- The list of grouping columns."""))
# ---------------------------------------------------------------------