| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # PageRank |
| |
| # Please refer to the pagerank.sql_in file for the documentation |
| |
| """ |
| @file pagerank.py_in |
| |
| @namespace graph |
| """ |
| |
| import plpy |
| from utilities.control import MinWarning |
| from utilities.utilities import _assert |
| from utilities.utilities import extract_keyvalue_params |
| from utilities.utilities import unique_string, split_quoted_delimited_str |
| from utilities.validate_args import columns_exist_in_table, get_cols_and_types |
| from graph_utils import * |
| |
| m4_changequote(`<!', `!>') |
| |
| def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table, |
| edge_params, out_table, damping_factor, max_iter, threshold, |
| grouping_cols_list, module_name): |
| """ |
| Function to validate input parameters for PageRank |
| """ |
| validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, |
| out_table, module_name) |
| _assert(damping_factor >= 0.0 and damping_factor <= 1.0, |
| """PageRank: Invalid damping factor value ({0}), must be between 0 and 1.""". |
| format(damping_factor)) |
| _assert(not threshold or (threshold >= 0.0 and threshold <= 1.0), |
| """PageRank: Invalid threshold value ({0}), must be between 0 and 1.""". |
| format(threshold)) |
| _assert(max_iter > 0, |
| """PageRank: Invalid max_iter value ({0}), must be a positive integer.""". |
| format(max_iter)) |
| if grouping_cols_list: |
| # validate the grouping columns. We currently only support grouping_cols |
| # to be column names in the edge_table, and not expressions! |
| _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib), |
| "PageRank error: One or more grouping columns specified do not exist!") |
| |
| def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, |
| out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs): |
| """ |
| Function that computes the PageRank |
| |
| Args: |
| @param vertex_table |
| @param vertex_id |
| @param edge_table |
| @param source_vertex |
| @param dest_vertex |
| @param out_table |
| @param damping_factor |
| @param max_iter |
| @param threshold |
| """ |
| old_msg_level = plpy.execute(""" |
| SELECT setting |
| FROM pg_settings |
| WHERE name='client_min_messages' |
| """)[0]['setting'] |
| plpy.execute('SET client_min_messages TO warning') |
| params_types = {'src': str, 'dest': str} |
| default_args = {'src': 'src', 'dest': 'dest'} |
| edge_params = extract_keyvalue_params(edge_args, params_types, default_args) |
| |
| # populate default values for optional params if null |
| if damping_factor is None: |
| damping_factor = 0.85 |
| if max_iter is None: |
| max_iter = 100 |
| if vertex_id is None: |
| vertex_id = "id" |
| if not grouping_cols: |
| grouping_cols = '' |
| |
| grouping_cols_list = split_quoted_delimited_str(grouping_cols) |
| validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table, |
| edge_params, out_table, damping_factor, max_iter, threshold, |
| grouping_cols_list, 'PageRank') |
| summary_table = out_table + "_summary" |
| _assert(not table_exists(summary_table), |
| "Graph PageRank: Output summary table ({summary_table}) already exists." |
| .format(**locals())) |
| src = edge_params["src"] |
| dest = edge_params["dest"] |
| nvertices = plpy.execute(""" |
| SELECT COUNT({0}) AS cnt |
| FROM {1} |
| """.format(vertex_id, vertex_table))[0]["cnt"] |
| # A fixed threshold value, of say 1e-5, might not work well when the |
| # number of vertices is a billion, since the initial pagerank value |
| # of all nodes would then be 1/1e-9. So, assign default threshold |
| # value based on number of nodes in the graph. |
| # NOTE: The heuristic below is not based on any scientific evidence. |
| if threshold is None: |
| threshold = 1.0/(nvertices*100) |
| |
| # table/column names used when grouping_cols is set. |
| distinct_grp_table = '' |
| vertices_per_group = '' |
| vpg = '' |
| grouping_where_clause = '' |
| group_by_clause = '' |
| random_prob = '' |
| |
| edge_temp_table = unique_string(desp='temp_edge') |
| distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, |
| <!"DISTRIBUTED BY ({0})".format(dest)!>) |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table)) |
| plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS |
| SELECT * FROM {edge_table} |
| {distribution} |
| """.format(**locals())) |
| # GPDB and HAWQ have distributed by clauses to help them with indexing. |
| # For Postgres we add the index explicitly. |
| sql_index = m4_ifdef(<!__POSTGRESQL__!>, |
| <!"""CREATE INDEX ON {edge_temp_table} ({src}); |
| """.format(**locals())!>, |
| <!''!>) |
| plpy.execute(sql_index) |
| |
| # Intermediate tables required. |
| cur = unique_string(desp='cur') |
| message = unique_string(desp='message') |
| cur_unconv = unique_string(desp='cur_unconv') |
| message_unconv = unique_string(desp='message_unconv') |
| out_cnts = unique_string(desp='out_cnts') |
| out_cnts_cnt = unique_string(desp='cnt') |
| v1 = unique_string(desp='v1') |
| |
| cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, |
| <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) |
| cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id} |
| """.format(**locals()) |
| out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src} |
| """.format(**locals()) |
| v1_join_clause = """{v1}.{vertex_id}={edge_temp_table}.{src} |
| """.format(**locals()) |
| |
| random_probability = (1.0-damping_factor)/nvertices |
| ###################################################################### |
| # Create several strings that will be used to construct required |
| # queries. These strings will be required only during grouping. |
| random_jump_prob = random_probability |
| ignore_group_clause_first = '' |
| limit = ' LIMIT 1 ' |
| |
| grouping_cols_select_pr = '' |
| vertices_per_group_inner_join_pr = '' |
| ignore_group_clause_pr= '' |
| |
| grouping_cols_select_ins = '' |
| vpg_from_clause_ins = '' |
| vpg_where_clause_ins = '' |
| message_grp_where_ins = '' |
| ignore_group_clause_ins = '' |
| |
| grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id) |
| group_by_grouping_cols_conv = '' |
| message_grp_clause_conv = '' |
| ignore_group_clause_conv = '' |
| ###################################################################### |
| |
| # Queries when groups are involved need a lot more conditions in |
| # various clauses, so populating the required variables. Some intermediate |
| # tables are unnecessary when no grouping is involved, so create some |
| # tables and certain columns only during grouping. |
| if grouping_cols: |
| distinct_grp_table = unique_string(desp='grp') |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table)) |
| plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS |
| SELECT DISTINCT {grouping_cols} FROM {edge_table} |
| """.format(**locals())) |
| vertices_per_group = unique_string(desp='nvert_grp') |
| init_pr = unique_string(desp='init') |
| random_prob = unique_string(desp='rand') |
| subq = unique_string(desp='subquery') |
| rand_damp = 1-damping_factor |
| grouping_where_clause = ' AND '.join( |
| [distinct_grp_table+'.'+col+'='+subq+'.'+col |
| for col in grouping_cols_list]) |
| group_by_clause = ', '.join([distinct_grp_table+'.'+col |
| for col in grouping_cols_list]) |
| # Find number of vertices in each group, this is the normalizing factor |
| # for computing the random_prob |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group)) |
| plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS |
| SELECT {distinct_grp_table}.*, |
| 1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr}, |
| {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob} |
| FROM {distinct_grp_table} INNER JOIN ( |
| SELECT {grouping_cols}, {src} AS __vertices__ |
| FROM {edge_table} |
| UNION |
| SELECT {grouping_cols}, {dest} FROM {edge_table} |
| ){subq} |
| ON {grouping_where_clause} |
| GROUP BY {group_by_clause} |
| """.format(**locals())) |
| |
| grouping_where_clause = ' AND '.join( |
| [vertices_per_group+'.'+col+'='+subq+'.'+col |
| for col in grouping_cols_list]) |
| group_by_clause = ', '.join([vertices_per_group+'.'+col |
| for col in grouping_cols_list]) |
| plpy.execute(""" |
| CREATE TEMP TABLE {cur} AS |
| SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id}, |
| {init_pr} AS pagerank |
| FROM {vertices_per_group} INNER JOIN ( |
| SELECT {grouping_cols}, {src} AS __vertices__ |
| FROM {edge_table} |
| UNION |
| SELECT {grouping_cols}, {dest} FROM {edge_table} |
| ){subq} |
| ON {grouping_where_clause} |
| """.format(**locals())) |
| vpg = unique_string(desp='vpg') |
| # Compute the out-degree of every node in the group-based subgraphs. |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts)) |
| plpy.execute("""CREATE TEMP TABLE {out_cnts} AS |
| SELECT {grouping_cols_select} {src} AS {vertex_id}, |
| COUNT({dest}) AS {out_cnts_cnt} |
| FROM {edge_table} |
| GROUP BY {grouping_cols_select} {src} |
| {cnts_distribution} |
| """.format(grouping_cols_select=grouping_cols+',' |
| if grouping_cols else '', **locals())) |
| |
| message_grp = ' AND '.join( |
| ["{cur}.{col}={message}.{col}".format(**locals()) |
| for col in grouping_cols_list]) |
| cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join( |
| ["{edge_temp_table}.{col}={cur}.{col}".format(**locals()) |
| for col in grouping_cols_list]) |
| out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join( |
| ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals()) |
| for col in grouping_cols_list]) |
| v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join( |
| ["{edge_temp_table}.{col}={v1}.{col}".format(**locals()) |
| for col in grouping_cols_list]) |
| vpg_join_clause = ' AND '.join( |
| ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals()) |
| for col in grouping_cols_list]) |
| vpg_cur_join_clause = ' AND '.join( |
| ["{cur}.{col}={vpg}.{col}".format(**locals()) |
| for col in grouping_cols_list]) |
| # join clause specific to populating random_prob for nodes without any |
| # incoming edges. |
| edge_grouping_cols_select = ', '.join( |
| ["{edge_temp_table}.{col}".format(**locals()) |
| for col in grouping_cols_list]) |
| cur_grouping_cols_select = ', '.join( |
| ["{cur}.{col}".format(**locals()) for col in grouping_cols_list]) |
| # Create output summary table: |
| cols_names_types = get_cols_and_types(edge_table) |
| grouping_cols_clause = ', '.join([c_name+" "+c_type |
| for (c_name, c_type) in cols_names_types |
| if c_name in grouping_cols_list]) |
| plpy.execute(""" |
| CREATE TABLE {summary_table} ( |
| {grouping_cols_clause}, |
| __iterations__ INTEGER |
| ) |
| """.format(**locals())) |
| # Create output table. This will be updated whenever a group converges |
| # Note that vertex_id is assumed to be an integer (as described in |
| # documentation) |
| plpy.execute(""" |
| CREATE TABLE {out_table} ( |
| {grouping_cols_clause}, |
| {vertex_id} INTEGER, |
| pagerank DOUBLE PRECISION |
| ) |
| """.format(**locals())) |
| temp_summary_table = unique_string(desp='temp_summary') |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table)) |
| plpy.execute(""" |
| CREATE TABLE {temp_summary_table} ( |
| {grouping_cols_clause} |
| ) |
| """.format(**locals())) |
| ###################################################################### |
| # Strings required for the main PageRank computation query |
| grouping_cols_select_pr = edge_grouping_cols_select+', ' |
| random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals()) |
| vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group} |
| AS {vpg} ON {vpg_join_clause}""".format(**locals()) |
| ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table, |
| edge_temp_table, grouping_cols_list) |
| # Strings required for updating PageRank scores of vertices that have |
| # no incoming edges |
| grouping_cols_select_ins = cur_grouping_cols_select+',' |
| vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format( |
| **locals()) |
| vpg_where_clause_ins = '{vpg_cur_join_clause} AND '.format( |
| **locals()) |
| message_grp_where_ins = 'WHERE {message_grp}'.format(**locals()) |
| ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table, |
| cur, grouping_cols_list) |
| # Strings required for convergence test query |
| grouping_cols_select_conv = cur_grouping_cols_select |
| group_by_grouping_cols_conv = ' GROUP BY {0}'.format( |
| cur_grouping_cols_select) |
| message_grp_clause_conv = '{0} AND '.format(message_grp) |
| ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table, |
| cur, grouping_cols_list) |
| limit = '' |
| else: |
| # cur and out_cnts tables can be simpler when no grouping is involved. |
| init_value = 1.0/nvertices |
| plpy.execute(""" |
| CREATE TEMP TABLE {cur} AS |
| SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank |
| FROM {vertex_table} |
| """.format(**locals())) |
| |
| # Compute the out-degree of every node in the graph. |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts)) |
| plpy.execute("""CREATE TEMP TABLE {out_cnts} AS |
| SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt} |
| FROM {edge_table} |
| GROUP BY {src} |
| {cnts_distribution} |
| """.format(**locals())) |
| |
| # The summary table when there is no grouping will contain only |
| # the iteration column. We don't need to create the out_table |
| # when no grouping is used since the 'cur' table will be renamed |
| # to out_table after pagerank computation is completed. |
| plpy.execute(""" |
| CREATE TABLE {summary_table} ( |
| __iterations__ INTEGER |
| ) |
| """.format(**locals())) |
| unconverged = 0 |
| iteration_num = 0 |
| for iteration_num in range(max_iter): |
| ##################################################################### |
| # PageRank for node 'A' at any given iteration 'i' is given by: |
| # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) + |
| # PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N |
| # where 'N' is the number of vertices in the graph, |
| # B, C are nodes that have edges to node A, and |
| # degree(node) represents the number of outgoing edges from 'node' |
| ##################################################################### |
| # Essentially, the pagerank for a node is based on an aggregate of a |
| # fraction of the pagerank values of all the nodes that have incoming |
| # edges to it, along with a small random probability. |
| # More information can be found at: |
| # https://en.wikipedia.org/wiki/PageRank#Damping_factor |
| |
| # The query below computes the PageRank of each node using the above |
| # formula. A small explanatory note on ignore_group_clause: |
| # This is used only when grouping is set. This essentially will have |
| # the condition that will help skip the PageRank computation on groups |
| # that have converged. |
| plpy.execute(""" |
| CREATE TABLE {message} AS |
| SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id}, |
| SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank |
| FROM {edge_temp_table} |
| INNER JOIN {cur} ON {cur_join_clause} |
| INNER JOIN {out_cnts} ON {out_cnts_join_clause} |
| INNER JOIN {cur} AS {v1} ON {v1_join_clause} |
| {vertices_per_group_inner_join_pr} |
| {ignore_group_clause} |
| GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest} |
| """.format(ignore_group_clause=ignore_group_clause_pr |
| if iteration_num>0 else ignore_group_clause_first, |
| **locals())) |
| # If there are nodes that have no incoming edges, they are not |
| # captured in the message table. Insert entries for such nodes, |
| # with random_prob. |
| plpy.execute(""" |
| INSERT INTO {message} |
| SELECT {grouping_cols_select_ins} {cur}.{vertex_id}, |
| {random_jump_prob} AS pagerank |
| FROM {cur} {vpg_from_clause_ins} |
| WHERE {vpg_where_clause_ins} {vertex_id} NOT IN ( |
| SELECT {vertex_id} |
| FROM {message} |
| {message_grp_where_ins} |
| ) |
| {ignore_group_clause} |
| GROUP BY {grouping_cols_select_ins} {cur}.{vertex_id} |
| """.format(ignore_group_clause=ignore_group_clause_ins |
| if iteration_num>0 else ignore_group_clause_first, |
| **locals())) |
| |
| # Check for convergence: |
| ## Check for convergence only if threshold != 0. |
| if threshold != 0: |
| # message_unconv and cur_unconv will contain the unconverged groups |
| # after current # and previous iterations respectively. Groups that |
| # are missing in message_unconv but appear in cur_unconv are the |
| # groups that have converged after this iteration's computations. |
| # If no grouping columns are specified, then we check if there is |
| # at least one unconverged node (limit 1 is used in the query). |
| plpy.execute(""" |
| CREATE TEMP TABLE {message_unconv} AS |
| SELECT {grouping_cols_select_conv} |
| FROM {message} |
| INNER JOIN {cur} |
| ON {cur}.{vertex_id}={message}.{vertex_id} |
| WHERE {message_grp_clause_conv} |
| ABS({cur}.pagerank-{message}.pagerank) > {threshold} |
| {ignore_group_clause} |
| {group_by_grouping_cols_conv} |
| {limit} |
| """.format(ignore_group_clause=ignore_group_clause_ins |
| if iteration_num>0 else ignore_group_clause_conv, |
| **locals())) |
| unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0} |
| """.format(message_unconv))[0]["cnt"] |
| if iteration_num > 0 and grouping_cols: |
| # Update result and summary tables for groups that have |
| # converged |
| # since the last iteration. |
| update_result_tables(temp_summary_table, iteration_num, |
| summary_table, out_table, message, grouping_cols_list, |
| cur_unconv, message_unconv) |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv)) |
| plpy.execute("""ALTER TABLE {message_unconv} RENAME TO |
| {cur_unconv} """.format(**locals())) |
| else: |
| # Do not run convergence test if threshold=0, since that implies |
| # the user wants to run max_iter iterations. |
| unconverged = 1 |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(cur)) |
| plpy.execute("""ALTER TABLE {message} RENAME TO {cur} |
| """.format(**locals())) |
| if unconverged == 0: |
| break |
| |
| # If there still are some unconverged groups/(entire table), |
| # update results. |
| if grouping_cols: |
| if unconverged > 0: |
| if threshold != 0: |
| # We completed max_iters, but there are still some unconverged |
| # groups # Update the result and summary tables for unconverged |
| # groups. |
| update_result_tables(temp_summary_table, iteration_num, |
| summary_table, out_table, cur, grouping_cols_list, |
| cur_unconv) |
| else: |
| # No group has converged. List of all group values are in |
| # distinct_grp_table. |
| update_result_tables(temp_summary_table, iteration_num, |
| summary_table, out_table, cur, grouping_cols_list, |
| distinct_grp_table) |
| else: |
| plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table} |
| """.format(table_name=cur, **locals())) |
| plpy.execute(""" |
| INSERT INTO {summary_table} VALUES |
| ({iteration_num}+1); |
| """.format(**locals())) |
| |
| ## Step 4: Cleanup |
| plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5}; |
| """.format(out_cnts, edge_temp_table, cur, message, cur_unconv, |
| message_unconv)) |
| if grouping_cols: |
| plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}; |
| """.format(vertices_per_group, temp_summary_table, |
| distinct_grp_table)) |
| plpy.execute("SET client_min_messages TO %s" % old_msg_level) |
| |
| def update_result_tables(temp_summary_table, i, summary_table, out_table, |
| res_table, grouping_cols_list, cur_unconv, message_unconv=None): |
| """ |
| This function updates the summary and output tables only for those |
| groups that have converged. This is found out by looking at groups |
| that appear in cur_unvonv but not in message_unconv: message_unconv |
| consists of groups that have not converged in the current iteration, |
| while cur_unconv contains groups that had not converged in the |
| previous iterations. The entries in cur_unconv is a superset of the |
| entries in message_unconv. So the difference in the groups across |
| the two tables represents the groups that converged in the current |
| iteration. |
| """ |
| plpy.execute("TRUNCATE TABLE {0}".format(temp_summary_table)) |
| if message_unconv is None: |
| # If this function is called after max_iter is completed, without |
| # convergence, all the unconverged groups from cur_unconv is used |
| # (note that message_unconv is renamed to cur_unconv before checking |
| # for unconverged==0 in the pagerank function's for loop) |
| plpy.execute(""" |
| INSERT INTO {temp_summary_table} |
| SELECT * FROM {cur_unconv} |
| """.format(**locals())) |
| else: |
| plpy.execute(""" |
| INSERT INTO {temp_summary_table} |
| SELECT {cur_unconv}.* |
| FROM {cur_unconv} |
| WHERE {join_condition} |
| """.format(join_condition=get_ignore_groups( |
| message_unconv, cur_unconv, grouping_cols_list), **locals())) |
| plpy.execute(""" |
| INSERT INTO {summary_table} |
| SELECT *, {i}+1 AS __iteration__ |
| FROM {temp_summary_table} |
| """.format(**locals())) |
| plpy.execute(""" |
| INSERT INTO {out_table} |
| SELECT {res_table}.* |
| FROM {res_table} |
| INNER JOIN {temp_summary_table} |
| ON {join_condition} |
| """.format(join_condition=' AND '.join( |
| ["{res_table}.{col}={temp_summary_table}.{col}".format( |
| **locals()) |
| for col in grouping_cols_list]), **locals())) |
| |
| def get_ignore_groups(first_table, second_table, grouping_cols_list): |
| """ |
| This function generates the necessary clause to only select the |
| groups that appear in second_table and not in first_table. |
| """ |
| return """({second_table_cols}) NOT IN (SELECT {grouping_cols} FROM |
| {first_table}) """.format(second_table_cols=', '.join( |
| ["{second_table}.{col}".format(**locals()) |
| for col in grouping_cols_list]), |
| grouping_cols=', '.join([col for col in grouping_cols_list]), |
| **locals()) |
| |
| def pagerank_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for pagerank |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if message is not None and \ |
| message.lower() in ("usage", "help", "?"): |
| help_string = "Get from method below" |
| help_string = get_graph_usage(schema_madlib, 'PageRank', |
| """out_table TEXT, -- Name of the output table for PageRank |
| damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model |
| -- (DEFAULT = 0.85) |
| max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100) |
| threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*100), |
| -- N is number of vertices in the graph) |
| grouping_col TEXT -- Comma separated column names to group on |
| -- (DEFAULT = NULL, no grouping) |
| """) + """ |
| |
| A summary table is also created that contains information regarding the |
| number of iterations required for convergence. It is named by adding the |
| suffix '_summary' to the 'out_table' parameter. |
| """ |
| else: |
| if message is not None and \ |
| message.lower() in ("example", "examples"): |
| help_string = """ |
| ---------------------------------------------------------------------------- |
| EXAMPLES |
| ---------------------------------------------------------------------------- |
| -- Create a graph, represented as vertex and edge tables. |
| DROP TABLE IF EXISTS vertex, edge; |
| CREATE TABLE vertex( |
| id INTEGER |
| ); |
| CREATE TABLE edge( |
| src INTEGER, |
| dest INTEGER, |
| user_id INTEGER |
| ); |
| INSERT INTO vertex VALUES |
| (0), |
| (1), |
| (2), |
| (3), |
| (4), |
| (5), |
| (6); |
| INSERT INTO edge VALUES |
| (0, 1, 1), |
| (0, 2, 1), |
| (0, 4, 1), |
| (1, 2, 1), |
| (1, 3, 1), |
| (2, 3, 1), |
| (2, 5, 1), |
| (2, 6, 1), |
| (3, 0, 1), |
| (4, 0, 1), |
| (5, 6, 1), |
| (6, 3, 1), |
| (0, 1, 2), |
| (0, 2, 2), |
| (0, 4, 2), |
| (1, 2, 2), |
| (1, 3, 2), |
| (2, 3, 2), |
| (3, 0, 2), |
| (4, 0, 2), |
| (5, 6, 2), |
| (6, 3, 2); |
| |
| -- Compute the PageRank: |
| DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary; |
| SELECT madlib.pagerank( |
| 'vertex', -- Vertex table |
| 'id', -- Vertix id column |
| 'edge', -- Edge table |
| 'src=src, dest=dest', -- Comma delimted string of edge arguments |
| 'pagerank_out'); -- Output table of PageRank |
| |
| -- View the PageRank of all vertices, sorted by their scores. |
| SELECT * FROM pagerank_out ORDER BY pagerank DESC; |
| -- View the summary table to find the number of iterations required for |
| -- convergence. |
| SELECT * FROM pagerank_out_summary; |
| |
| -- Compute PageRank of nodes associated with each user: |
| DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary; |
| SELECT madlib.pagerank( |
| 'vertex', -- Vertex table |
| 'id', -- Vertix id column |
| 'edge', -- Edge table |
| 'src=src, dest=dest', -- Comma delimted string of edge arguments |
| 'pagerank_out', -- Output table of PageRank |
| NULL, -- Default damping factor |
| NULL, -- Default max_iter |
| 0.00000001, -- Threshold |
| 'user_id'); -- Grouping column |
| |
| -- View the PageRank of all vertices, sorted by their scores. |
| SELECT * FROM pagerank_out ORDER BY user_id, pagerank DESC; |
| -- View the summary table to find the number of iterations required for |
| -- convergence for each group. |
| SELECT * FROM pagerank_out_summary; |
| """ |
| else: |
| help_string = """ |
| ---------------------------------------------------------------------------- |
| SUMMARY |
| ---------------------------------------------------------------------------- |
| Given a directed graph, pagerank algorithm finds the PageRank score of all |
| the vertices in the graph. |
| -- |
| For an overview on usage, run: |
| SELECT {schema_madlib}.pagerank('usage'); |
| |
| For some examples, run: |
| SELECT {schema_madlib}.pagerank('example') |
| -- |
| """ |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |