| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Breadth-First Search |
| |
| # Please refer to the bfs.sql_in file for the documentation |
| |
| """ |
| @file bfs.py_in |
| |
| @namespace graph |
| """ |
| |
| import plpy |
| from graph_utils import validate_graph_coding |
| from graph_utils import get_graph_usage |
| from utilities.control import MinWarning |
| from utilities.utilities import _assert |
| from utilities.utilities import _check_groups |
| from utilities.utilities import get_table_qualified_col_str |
| from utilities.utilities import _grp_null_checks |
| from utilities.utilities import add_postfix |
| from utilities.utilities import extract_keyvalue_params |
| from utilities.utilities import unique_string, split_quoted_delimited_str |
| from utilities.validate_args import table_exists |
| from utilities.validate_args import columns_exist_in_table |
| |
| m4_changequote(`<!', `!>') |
| |
| def _validate_bfs(vertex_table, vertex_id, edge_table, edge_params, |
| source_vertex, out_table, max_distance, directed, grouping_cols_list, **kwargs): |
| |
| validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, |
| out_table,'BFS') |
| |
| _assert((max_distance >= 0) and isinstance(max_distance,int), |
| """Graph BFS: Invalid max_distance type or value ({0}), must be integer, |
| be greater than or equal to 0 and be less than max allowable integer |
| (2147483647).""". |
| format(max_distance)) |
| |
| _assert(isinstance(directed,bool), |
| """Graph BFS: Invalid value for directed ({0}), must be boolean.""". |
| format(directed)) |
| |
| _assert(isinstance(source_vertex,int), |
| """Graph BFS: Source vertex {source_vertex} has to be an integer.""". |
| format(**locals())) |
| src_exists = plpy.execute(""" |
| SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex} |
| """.format(**locals())) |
| if src_exists.nrows() == 0: |
| plpy.error( |
| """Graph BFS: Source vertex {source_vertex} is not present in the |
| vertex table {vertex_table}.""". |
| format(**locals())) |
| |
| vt_error = plpy.execute( |
| """ SELECT {vertex_id} |
| FROM {vertex_table} |
| WHERE {vertex_id} IS NOT NULL |
| GROUP BY {vertex_id} |
| HAVING count(*) > 1 """.format(**locals())) |
| if vt_error.nrows() != 0: |
| plpy.error( |
| """Graph BFS: Source vertex table {vertex_table} contains duplicate |
| vertex id's.""". |
| format(**locals())) |
| |
| summary_table = add_postfix(out_table, "_summary") |
| _assert(not table_exists(summary_table), |
| "Graph BFS: Output summary table already exists!") |
| |
| if grouping_cols_list is not None: |
| _assert(columns_exist_in_table(edge_table, grouping_cols_list), |
| """Graph BFS: Not all columns from {grouping_cols_list} are present |
| in edge table ({edge_table}).""". |
| format(**locals())) |
| |
| return None |
| |
| |
| def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table, |
| edge_args, source_vertex, out_table, max_distance, directed, grouping_cols, |
| **kwargs): |
| |
| """ |
| Breadth First Search algorithm for graphs [1]. |
| Args: |
| @param vertex_table Name of the table that contains the vertex data. |
| @param vertex_id Name of the column containing the vertex ids. |
| @param edge_table Name of the table that contains the edge data. |
| @param edge_args A comma-delimited string containing multiple |
| named arguments of the form "name=value". |
| @param source_vertex The source vertex id for the algorithm to start. |
| @param out_table Name of the table to store the result of BFS. |
| @param max_distance Maximum distance from the source_vertex to search for. |
| @param directed Graph will be treated as directed if this boolean flag |
| is set to TRUE. Graph is treated as undirected by default. |
| @param grouping_cols The list of grouping columns. |
| |
| [1] https://en.wikipedia.org/wiki/Breadth-first_search |
| """ |
| |
| with MinWarning("warning"): |
| |
| INT_MAX = 2147483647 |
| |
| params_types = {'src': str, 'dest': str} |
| default_args = {'src': 'src', 'dest': 'dest'} |
| edge_params = extract_keyvalue_params(edge_args, |
| params_types, |
| default_args) |
| |
| # Prepare the input for recording in the summary table |
| if not vertex_id: |
| v_st = '' |
| vertex_id = "id" |
| else: |
| v_st = vertex_id |
| |
| if not edge_args: |
| e_st = '' |
| else: |
| e_st = edge_args |
| |
| if not grouping_cols: |
| g_st = '' |
| glist = None |
| else: |
| g_st = grouping_cols |
| glist = split_quoted_delimited_str(grouping_cols) |
| |
| if max_distance is None: |
| d_st= "NULL" |
| max_distance = INT_MAX |
| else: |
| d_st = max_distance |
| |
| if directed is None: |
| dir_st= "NULL" |
| directed = False |
| else: |
| dir_st = directed |
| |
| src = edge_params["src"] |
| dest = edge_params["dest"] |
| |
| distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, |
| <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) |
| local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, |
| <!"DISTRIBUTED RANDOMLY"!>) |
| |
| _validate_bfs(vertex_table, vertex_id, edge_table, |
| edge_params, source_vertex, out_table, max_distance, directed, glist) |
| |
| subq = unique_string(desp='subq') |
| subq1 = unique_string(desp='subq1') |
| sube = unique_string(desp='edge') |
| sube1 = unique_string(desp='edge1') |
| |
| # Initialize grouping related variables |
| insert_qry_undirected_init = "" |
| grp_comma = "" |
| and_grp_null_checks = "" |
| grp_sube_comma = "" |
| grp_sube1_comma = "" |
| subq_grp_join = '' |
| |
| if grouping_cols and grouping_cols is not '': |
| grp_comma = grouping_cols + ", " |
| and_grp_null_checks = " AND " + _grp_null_checks(glist) |
| grp_sube_comma = get_table_qualified_col_str(sube, glist) + " , " |
| grp_sube1_comma = get_table_qualified_col_str(sube1, glist) + " , " |
| |
| # We keep a table of every vertex, the distance to that vertex from source |
| # and the parent in the path to the vertex. |
| # This table will be updated throughout the execution. |
| dist_col = "dist" |
| parent_col = "parent" |
| curr_dist_val = 0 |
| |
| # Creating the output table with the appropriate columns and data types |
| plpy.execute(""" |
| CREATE TABLE {out_table} AS ( |
| SELECT |
| {grp_comma} |
| {src} AS {vertex_id}, |
| {curr_dist_val}::INT AS {dist_col}, |
| {src} AS {parent_col} |
| FROM {edge_table} |
| LIMIT 0 |
| ) {distribution}""".format(**locals())) |
| |
| # We keep a summary table to keep track of the parameters used for this |
| # BFS run |
| summary_table = add_postfix(out_table, "_summary") |
| plpy.execute( """ |
| CREATE TABLE {summary_table} ( |
| vertex_table TEXT, |
| vertex_id TEXT, |
| edge_table TEXT, |
| edge_args TEXT, |
| source_vertex INTEGER, |
| out_table TEXT, |
| max_distance INTEGER, |
| directed BOOLEAN, |
| grouping_cols TEXT |
| ) |
| """.format(**locals())) |
| |
| plpy.execute(""" |
| INSERT INTO {summary_table} VALUES |
| ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}', |
| {source_vertex}, '{out_table}', {d_st}, {dir_st}, '{g_st}') |
| """.format(**locals())) |
| |
| # The queries for directed and undirected graphs share a common section. |
| # There are additional clauses added to the undirected graph queries. |
| # In the undirected case edges can be considered to go from {src} to |
| # {dest} and {dest} to {src} |
| |
| if not directed: |
| insert_qry_undirected_init = """ OR {dest} = {source_vertex} |
| """.format(**locals()) |
| |
| # This step inserts into the output table the source vertex for each |
| # group in which it is present. Grouping behavior is not predictable |
| # when there are NULLs in any grouping column. Therefore those rows |
| # are explicitly removed from analysis |
| insert_qry_init = """ |
| INSERT INTO {out_table} |
| SELECT {grp_comma} |
| {source_vertex} AS {vertex_id}, |
| {curr_dist_val} AS {dist_col}, |
| NULL AS {parent_col} |
| FROM {edge_table} |
| WHERE ({src} = {source_vertex} {insert_qry_undirected_init}) |
| {and_grp_null_checks} |
| GROUP BY {grp_comma} {vertex_id}, {dist_col} |
| """.format(**locals()) |
| plpy.execute(insert_qry_init.format(**locals())) |
| |
| # Create a table that will hold the new vertices to be explored next. |
| message = unique_string(desp='message') |
| plpy.execute(""" |
| CREATE TEMP TABLE {message} AS |
| SELECT {grp_comma} {vertex_id}, {parent_col} |
| FROM {out_table} |
| """.format(**locals())) |
| |
| # After initialization of the output table, number of nodes connected |
| # by edges to the source vertex in each group is counted. This is also used |
| # below in the BFS iteration while-loop |
| edge_grp_join = "" |
| subq1_grp_join = "" |
| if grouping_cols: |
| subq_grp_join = ' AND ' + _check_groups(subq, sube, glist) |
| subq1_grp_join = ' AND ' + _check_groups(subq1, sube1, glist) |
| edge_grp_join = ' AND ' + _check_groups(edge_table, out_table, glist) |
| |
| count_qry = """ SELECT count(*) AS count FROM {message} |
| """.format(**locals()) |
| vct = plpy.execute(count_qry.format(**locals()))[0]['count'] |
| |
| # This insert statement is executed within the BFS iteration while-loop |
| # below. It is used to discover and store all nodes (not already found) |
| # connected to those found in the immediate previous iteration, which |
| # are stored in the {message} table. |
| toupdate = unique_string(desp='toupdate') |
| insert_toupdate_table = """ |
| CREATE TEMP TABLE {toupdate} AS |
| SELECT {grp_sube_comma} {sube}.{dest} AS {vertex_id}, {sube}.{src} AS {parent_col} |
| FROM ( |
| SELECT {grp_comma} {src}, {dest} |
| FROM {edge_table} |
| WHERE NOT EXISTS ( |
| SELECT 1 |
| FROM {out_table} |
| WHERE {out_table}.{vertex_id} = {edge_table}.{dest} |
| {edge_grp_join} |
| ) |
| ) AS {sube} |
| INNER JOIN {message} AS {subq} |
| ON ({sube}.{src}={subq}.{vertex_id} {subq_grp_join}) |
| """ |
| if not directed: |
| insert_toupdate_table += """ |
| UNION ALL |
| SELECT {grp_sube1_comma} {sube1}.{src} AS {vertex_id}, |
| {sube1}.{dest} AS {parent_col} |
| FROM ( |
| SELECT {grp_comma} {src}, {dest} |
| FROM {edge_table} |
| WHERE NOT EXISTS ( |
| SELECT 1 |
| FROM {out_table} |
| WHERE {out_table}.{vertex_id} = {edge_table}.{src} |
| {edge_grp_join} |
| ) |
| ) AS {sube1} |
| INNER JOIN {message} AS {subq1} |
| ON ({sube1}.{dest}={subq1}.{vertex_id} {subq1_grp_join}) |
| """.format(**locals()) |
| |
| insert_message_loop = """ |
| CREATE TEMP TABLE {message} AS |
| SELECT {grp_comma} {vertex_id}, {{curr_dist_val}}+1 AS {dist_col}, |
| MIN({parent_col}) |
| FROM {toupdate} |
| GROUP BY {grp_comma} {vertex_id} |
| """.format(**locals()) |
| |
| insert_qry_loop = """ |
| INSERT INTO {out_table} |
| SELECT * FROM {message} |
| """.format(**locals()) |
| |
| # Main loop for traversing the graph |
| while vct > 0 and curr_dist_val < max_distance: |
| # The loop consists of two steps: |
| # 1) Disover and store all nodes that are linked to nodes found in |
| # the immediate previous iteration of the loop that have not already |
| # been found in all previous iterations |
| # 2) Check for any nodes linked to those discovered in Step 1 above |
| # that have not yet been discovered |
| # |
| # If a node has multiple possible parents then the parent with the |
| # smallest ID is chosen for output |
| |
| # In the directed graph case only nodes in the {dest} column of |
| # the edge table are searched to find new nodes reachable from |
| # previously discovered nodes |
| |
| # In the undirected graph case edges are treated as non-directional |
| # (or bidirectional). Nodes in both the {src} and {dest} columns of |
| # the edge table are searched to find new nodes reachable from |
| # previously discovered nodes. |
| # |
| # This approach does NOT require the user to provide a forward edge |
| # and a reverse edge between the same two nodes to indicate the |
| # graph's undirected nature. However, it will work in that scenario |
| # as well. |
| |
| # Discover and store all nodes (not already found) connected to |
| # those found in the immediate previous iteration |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate)) |
| plpy.execute(insert_toupdate_table.format(**locals())) |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(message)) |
| plpy.execute(insert_message_loop.format(**locals())) |
| |
| plpy.execute(insert_qry_loop.format(**locals())) |
| |
| # Update distance value for next iteration |
| curr_dist_val = curr_dist_val + 1 |
| |
| # Count / find any nodes that are connected to those discovered and |
| # stored in this iteration. This is used to check if the iterations |
| # need to continue. |
| vct = plpy.execute(count_qry.format(**locals()))[0]['count'] |
| |
| # Filter out the infinite paths (disconnected pairs) |
| plpy.execute(""" UPDATE {out_table} SET parent = {source_vertex} |
| WHERE {vertex_id} = {source_vertex} |
| """.format(**locals())) |
| |
| plpy.execute(""" DELETE FROM {0} WHERE parent IS NULL |
| """.format(out_table)) |
| # Drop temp tables |
| plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(toupdate, message)) |
| |
| return None |
| |
| def graph_bfs_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for graph_bfs |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if not message: |
| help_string = """ |
| ----------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------- |
| |
| Given a graph and a source vertex, the Breadth-first Search (BFS) algorithm |
| finds all nodes reachable from the source vertex. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.graph_bfs('usage') |
| """ |
| elif message.lower() in ['usage', 'help', '?']: |
| help_string = """ |
| Given a graph and a source vertex, the Breadth-first Search (BFS) algorithm |
| finds all nodes reachable from the source vertex. |
| |
| {graph_usage} |
| |
| ---------------------------------------------------------------------------- |
| OUTPUT |
| ---------------------------------------------------------------------------- |
| The output of BFS ('out_table' above) contains a row for every vertex of that is |
| reachable from the source_vertex. In the presence of grouping columns, only those |
| edges are used for which there are no NULL values in any grouping column. |
| The output table will have the following columns (in addition to the |
| grouping columns): |
| - vertex_id : The id for any node reachable from source_vertex in addition to |
| the source_vertex. Will use the input parameter 'vertex_id' for |
| column naming. |
| - dist : The distance in number of edges (or hops) from the source_vertex |
| to where this vertex is located. |
| - parent : The parent of this vertex in BFS traversal of the graph from |
| source_vertex. Will use 'parent' for column naming. For the |
| case where vertex_id = source_vertex, the value for parent is NULL. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.graph_bfs()" |
| |
| return help_string.format(schema_madlib=schema_madlib, |
| graph_usage=get_graph_usage(schema_madlib, 'graph_bfs', |
| """source_vertex INT, -- The source vertex id for the algorithm to start. |
| out_table TEXT, -- Name of the table to store the result of BFS. |
| max_distance INT, -- Maximum distance from source_vertex to search through in the graph. |
| directed INT, -- If TRUE the graph will be treated as directed. |
| grouping_cols TEXT -- A comma-separated list of grouping columns.""")) |
| # --------------------------------------------------------------------- |