blob: e802aac14d5c1522c901200db99898dcef6b5c98 [file] [log] [blame]
# coding=utf-8
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Breadth-First Search
# Please refer to the bfs.sql_in file for the documentation
@file bfs.py_in
@namespace graph
import plpy
from graph_utils import validate_graph_coding
from graph_utils import get_graph_usage
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import _grp_null_checks
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
m4_changequote(`<!', `!>')
def _validate_bfs(vertex_table, vertex_id, edge_table, edge_params,
source_vertex, out_table, max_distance, directed, grouping_cols_list, **kwargs):
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
_assert((max_distance >= 0) and isinstance(max_distance,int),
"""Graph BFS: Invalid max_distance type or value ({0}), must be integer,
be greater than or equal to 0 and be less than max allowable integer
"""Graph BFS: Invalid value for directed ({0}), must be boolean.""".
"""Graph BFS: Source vertex {source_vertex} has to be an integer.""".
src_exists = plpy.execute("""
SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
if src_exists.nrows() == 0:
"""Graph BFS: Source vertex {source_vertex} is not present in the
vertex table {vertex_table}.""".
vt_error = plpy.execute(
""" SELECT {vertex_id}
FROM {vertex_table}
WHERE {vertex_id} IS NOT NULL
GROUP BY {vertex_id}
HAVING count(*) > 1 """.format(**locals()))
if vt_error.nrows() != 0:
"""Graph BFS: Source vertex table {vertex_table} contains duplicate
vertex id's.""".
summary_table = add_postfix(out_table, "_summary")
_assert(not table_exists(summary_table),
"Graph BFS: Output summary table already exists!")
if grouping_cols_list is not None:
_assert(columns_exist_in_table(edge_table, grouping_cols_list),
"""Graph BFS: Not all columns from {grouping_cols_list} are present
in edge table ({edge_table}).""".
return None
def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table,
edge_args, source_vertex, out_table, max_distance, directed, grouping_cols,
Breadth First Search algorithm for graphs [1].
@param vertex_table Name of the table that contains the vertex data.
@param vertex_id Name of the column containing the vertex ids.
@param edge_table Name of the table that contains the edge data.
@param edge_args A comma-delimited string containing multiple
named arguments of the form "name=value".
@param source_vertex The source vertex id for the algorithm to start.
@param out_table Name of the table to store the result of BFS.
@param max_distance Maximum distance from the source_vertex to search for.
@param directed Graph will be treated as directed if this boolean flag
is set to TRUE. Graph is treated as undirected by default.
@param grouping_cols The list of grouping columns.
with MinWarning("warning"):
INT_MAX = 2147483647
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
edge_params = extract_keyvalue_params(edge_args,
# Prepare the input for recording in the summary table
if not vertex_id:
v_st = ''
vertex_id = "id"
v_st = vertex_id
if not edge_args:
e_st = ''
e_st = edge_args
if not grouping_cols:
g_st = ''
glist = None
g_st = grouping_cols
glist = split_quoted_delimited_str(grouping_cols)
if max_distance is None:
d_st= "NULL"
max_distance = INT_MAX
d_st = max_distance
if directed is None:
dir_st= "NULL"
directed = False
dir_st = directed
src = edge_params["src"]
dest = edge_params["dest"]
distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
<!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
_validate_bfs(vertex_table, vertex_id, edge_table,
edge_params, source_vertex, out_table, max_distance, directed, glist)
subq = unique_string(desp='subq')
subq1 = unique_string(desp='subq1')
sube = unique_string(desp='edge')
sube1 = unique_string(desp='edge1')
# Initialize grouping related variables
insert_qry_undirected_init = ""
grp_comma = ""
and_grp_null_checks = ""
grp_sube_comma = ""
grp_sube1_comma = ""
subq_grp_join = ''
if grouping_cols and grouping_cols is not '':
grp_comma = grouping_cols + ", "
and_grp_null_checks = " AND " + _grp_null_checks(glist)
grp_sube_comma = get_table_qualified_col_str(sube, glist) + " , "
grp_sube1_comma = get_table_qualified_col_str(sube1, glist) + " , "
# We keep a table of every vertex, the distance to that vertex from source
# and the parent in the path to the vertex.
# This table will be updated throughout the execution.
dist_col = "dist"
parent_col = "parent"
curr_dist_val = 0
# Creating the output table with the appropriate columns and data types
CREATE TABLE {out_table} AS (
{src} AS {vertex_id},
{curr_dist_val}::INT AS {dist_col},
{src} AS {parent_col}
FROM {edge_table}
) {distribution}""".format(**locals()))
# We keep a summary table to keep track of the parameters used for this
# BFS run
summary_table = add_postfix(out_table, "_summary")
plpy.execute( """
CREATE TABLE {summary_table} (
vertex_table TEXT,
vertex_id TEXT,
edge_table TEXT,
edge_args TEXT,
source_vertex INTEGER,
out_table TEXT,
max_distance INTEGER,
directed BOOLEAN,
grouping_cols TEXT
INSERT INTO {summary_table} VALUES
('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
{source_vertex}, '{out_table}', {d_st}, {dir_st}, '{g_st}')
# The queries for directed and undirected graphs share a common section.
# There are additional clauses added to the undirected graph queries.
# In the undirected case edges can be considered to go from {src} to
# {dest} and {dest} to {src}
if not directed:
insert_qry_undirected_init = """ OR {dest} = {source_vertex}
# This step inserts into the output table the source vertex for each
# group in which it is present. Grouping behavior is not predictable
# when there are NULLs in any grouping column. Therefore those rows
# are explicitly removed from analysis
insert_qry_init = """
INSERT INTO {out_table}
SELECT {grp_comma}
{source_vertex} AS {vertex_id},
{curr_dist_val} AS {dist_col},
NULL AS {parent_col}
FROM {edge_table}
WHERE ({src} = {source_vertex} {insert_qry_undirected_init})
GROUP BY {grp_comma} {vertex_id}, {dist_col}
# Create a table that will hold the new vertices to be explored next.
message = unique_string(desp='message')
SELECT {grp_comma} {vertex_id}, {parent_col}
FROM {out_table}
# After initialization of the output table, number of nodes connected
# by edges to the source vertex in each group is counted. This is also used
# below in the BFS iteration while-loop
edge_grp_join = ""
subq1_grp_join = ""
if grouping_cols:
subq_grp_join = ' AND ' + _check_groups(subq, sube, glist)
subq1_grp_join = ' AND ' + _check_groups(subq1, sube1, glist)
edge_grp_join = ' AND ' + _check_groups(edge_table, out_table, glist)
count_qry = """ SELECT count(*) AS count FROM {message}
vct = plpy.execute(count_qry.format(**locals()))[0]['count']
# This insert statement is executed within the BFS iteration while-loop
# below. It is used to discover and store all nodes (not already found)
# connected to those found in the immediate previous iteration, which
# are stored in the {message} table.
toupdate = unique_string(desp='toupdate')
insert_toupdate_table = """
SELECT {grp_sube_comma} {sube}.{dest} AS {vertex_id}, {sube}.{src} AS {parent_col}
SELECT {grp_comma} {src}, {dest}
FROM {edge_table}
FROM {out_table}
WHERE {out_table}.{vertex_id} = {edge_table}.{dest}
) AS {sube}
INNER JOIN {message} AS {subq}
ON ({sube}.{src}={subq}.{vertex_id} {subq_grp_join})
if not directed:
insert_toupdate_table += """
SELECT {grp_sube1_comma} {sube1}.{src} AS {vertex_id},
{sube1}.{dest} AS {parent_col}
SELECT {grp_comma} {src}, {dest}
FROM {edge_table}
FROM {out_table}
WHERE {out_table}.{vertex_id} = {edge_table}.{src}
) AS {sube1}
INNER JOIN {message} AS {subq1}
ON ({sube1}.{dest}={subq1}.{vertex_id} {subq1_grp_join})
insert_message_loop = """
SELECT {grp_comma} {vertex_id}, {{curr_dist_val}}+1 AS {dist_col},
FROM {toupdate}
GROUP BY {grp_comma} {vertex_id}
insert_qry_loop = """
INSERT INTO {out_table}
SELECT * FROM {message}
# Main loop for traversing the graph
while vct > 0 and curr_dist_val < max_distance:
# The loop consists of two steps:
# 1) Disover and store all nodes that are linked to nodes found in
# the immediate previous iteration of the loop that have not already
# been found in all previous iterations
# 2) Check for any nodes linked to those discovered in Step 1 above
# that have not yet been discovered
# If a node has multiple possible parents then the parent with the
# smallest ID is chosen for output
# In the directed graph case only nodes in the {dest} column of
# the edge table are searched to find new nodes reachable from
# previously discovered nodes
# In the undirected graph case edges are treated as non-directional
# (or bidirectional). Nodes in both the {src} and {dest} columns of
# the edge table are searched to find new nodes reachable from
# previously discovered nodes.
# This approach does NOT require the user to provide a forward edge
# and a reverse edge between the same two nodes to indicate the
# graph's undirected nature. However, it will work in that scenario
# as well.
# Discover and store all nodes (not already found) connected to
# those found in the immediate previous iteration
plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
plpy.execute("DROP TABLE IF EXISTS {0}".format(message))
# Update distance value for next iteration
curr_dist_val = curr_dist_val + 1
# Count / find any nodes that are connected to those discovered and
# stored in this iteration. This is used to check if the iterations
# need to continue.
vct = plpy.execute(count_qry.format(**locals()))[0]['count']
# Filter out the infinite paths (disconnected pairs)
plpy.execute(""" UPDATE {out_table} SET parent = {source_vertex}
WHERE {vertex_id} = {source_vertex}
plpy.execute(""" DELETE FROM {0} WHERE parent IS NULL
# Drop temp tables
plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(toupdate, message))
return None
def graph_bfs_help(schema_madlib, message, **kwargs):
Help function for graph_bfs
@param schema_madlib
@param message: string, Help message string
@param kwargs
String. Help/usage information
if not message:
help_string = """
Given a graph and a source vertex, the Breadth-first Search (BFS) algorithm
finds all nodes reachable from the source vertex.
For more details on function usage:
SELECT {schema_madlib}.graph_bfs('usage')
elif message.lower() in ['usage', 'help', '?']:
help_string = """
Given a graph and a source vertex, the Breadth-first Search (BFS) algorithm
finds all nodes reachable from the source vertex.
The output of BFS ('out_table' above) contains a row for every vertex of that is
reachable from the source_vertex. In the presence of grouping columns, only those
edges are used for which there are no NULL values in any grouping column.
The output table will have the following columns (in addition to the
grouping columns):
- vertex_id : The id for any node reachable from source_vertex in addition to
the source_vertex. Will use the input parameter 'vertex_id' for
column naming.
- dist : The distance in number of edges (or hops) from the source_vertex
to where this vertex is located.
- parent : The parent of this vertex in BFS traversal of the graph from
source_vertex. Will use 'parent' for column naming. For the
case where vertex_id = source_vertex, the value for parent is NULL.
help_string = "No such option. Use {schema_madlib}.graph_bfs()"
return help_string.format(schema_madlib=schema_madlib,
graph_usage=get_graph_usage(schema_madlib, 'graph_bfs',
"""source_vertex INT, -- The source vertex id for the algorithm to start.
out_table TEXT, -- Name of the table to store the result of BFS.
max_distance INT, -- Maximum distance from source_vertex to search through in the graph.
directed INT, -- If TRUE the graph will be treated as directed.
grouping_cols TEXT -- A comma-separated list of grouping columns."""))
# ---------------------------------------------------------------------