| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # PageRank |
| |
| # Please refer to the pagerank.sql_in file for the documentation |
| |
| """ |
| @file pagerank.py_in |
| |
| @namespace graph |
| """ |
| |
| import plpy |
| from graph_utils import get_graph_usage |
| from graph_utils import get_default_threshold_for_link_analysis |
| from graph_utils import update_output_grouping_tables_for_link_analysis |
| from graph_utils import validate_graph_coding |
| from graph_utils import validate_output_and_summary_tables |
| from graph_utils import validate_params_for_link_analysis |
| |
| from utilities.control import MinWarning |
| from utilities.control import OptimizerControl |
| from utilities.utilities import _assert |
| from utilities.utilities import _check_groups |
| from utilities.utilities import get_filtered_cols_subquery_str |
| from utilities.utilities import get_table_qualified_col_str |
| from utilities.utilities import add_postfix |
| from utilities.utilities import extract_keyvalue_params |
| from utilities.utilities import unique_string, split_quoted_delimited_str |
| from utilities.utilities import is_platform_pg |
| from utilities.utilities import py_list_to_sql_string |
| |
| from utilities.validate_args import columns_exist_in_table, get_cols_and_types |
| from utilities.validate_args import table_exists |
| from utilities.utilities import rename_table |
| |
| |
| def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table, |
| edge_params, out_table, damping_factor, max_iter, |
| threshold, grouping_cols_list, personalization_vertices): |
| """ |
| Function to validate input parameters for PageRank |
| """ |
| validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, |
| out_table, 'PageRank') |
| # Validate args such as threshold and max_iter |
| validate_params_for_link_analysis(schema_madlib, "PageRank", |
| threshold, max_iter, |
| edge_table, grouping_cols_list) |
| _assert(damping_factor >= 0.0 and damping_factor <= 1.0, |
| "PageRank: Invalid damping factor value ({0}), must be between 0 and 1.". |
| format(damping_factor)) |
| |
| |
| # Validate against the given set of nodes for Personalized Page Rank |
| if personalization_vertices: |
| grouping_cols = get_table_qualified_col_str( |
| edge_table, grouping_cols_list) |
| group_by_clause = "GROUP BY {0}".format(grouping_cols) \ |
| if grouping_cols_list else '' |
| src = edge_params["src"] |
| dest = edge_params["dest"] |
| input_personalization_vertices_length = len(personalization_vertices) |
| |
| personalization_vertices_str = ','.join([str(i) for i in personalization_vertices]) |
| |
| # Get a list which has the number of personalization nodes of each group |
| vertices_count_list_by_group = plpy.execute(""" |
| SELECT count(distinct {vertex_id}) AS count |
| FROM {vertex_table} |
| RIGHT JOIN {edge_table} |
| ON ({vertex_table}.{vertex_id} = {edge_table}.{src} |
| OR {vertex_table}.{vertex_id} = {edge_table}.{dest}) |
| AND {vertex_table}.{vertex_id} = ANY(ARRAY[{personalization_vertices_str}]) |
| {group_by_clause} |
| """.format(**locals())) |
| |
| |
| # The number of personalization nodes for every group should be equal to |
| # the number given by input personalization_vertices list. Otherwise, |
| # some nodes are missing for certain group. Or there might be duplicate |
| # nodes in input personalization_vertices list. Or there are some |
| # invalid nodes in input list that don't exist in vertex table. In any |
| # case, throw an error. |
| for key in vertices_count_list_by_group: |
| if key["count"] != input_personalization_vertices_length: |
| plpy.error("Personalization nodes must be a subset " |
| "of the vertex_table without duplicates and " |
| "every nodes should be present in all the groups") |
| |
| |
| def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_table, |
| damping_factor, max_iter, threshold, grouping_cols, personalization_vertices, **kwargs): |
| """ |
| Function that computes the PageRank |
| |
| Args: |
| @param vertex_table |
| @param vertex_id |
| @param edge_table |
| @param source_vertex |
| @param dest_vertex |
| @param out_table |
| @param damping_factor |
| @param max_iter |
| @param threshold |
| @param personalization_vertices |
| """ |
| # We noticed the current implementation is noticeably slower when orca=ON |
| # on Greenplum 5, so that we temporarily turn if off to run pagerank. |
| with OptimizerControl(False): |
| |
| with MinWarning('warning'): |
| params_types = {'src': str, 'dest': str} |
| default_args = {'src': 'src', 'dest': 'dest'} |
| edge_params = extract_keyvalue_params( |
| edge_args, params_types, default_args) |
| |
| # populate default values for optional params if null |
| if damping_factor is None: |
| damping_factor = 0.85 |
| if max_iter is None: |
| max_iter = 100 |
| if not vertex_id: |
| vertex_id = "id" |
| if not grouping_cols: |
| grouping_cols = '' |
| |
| grouping_cols_list = split_quoted_delimited_str(grouping_cols) |
| validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table, |
| edge_params, out_table, damping_factor, |
| max_iter, threshold, grouping_cols_list, |
| personalization_vertices) |
| |
| summary_table = add_postfix(out_table, "_summary") |
| _assert(not table_exists(summary_table), |
| "Graph PageRank: Output summary table ({summary_table}) already exists." |
| .format(**locals())) |
| src = edge_params["src"] |
| dest = edge_params["dest"] |
| n_vertices = plpy.execute(""" |
| SELECT COUNT({0}) AS cnt |
| FROM {1} |
| """.format(vertex_id, vertex_table))[0]["cnt"] |
| |
| if threshold is None: |
| threshold = get_default_threshold_for_link_analysis(n_vertices) |
| |
| # table/column names used when grouping_cols is set. |
| distinct_grp_table = '' |
| vertices_per_group = '' |
| vpg = '' |
| grouping_where_clause = '' |
| group_by_clause = '' |
| random_prob = '' |
| ppr_join_clause = '' |
| |
| edge_temp_table = unique_string(desp='temp_edge') |
| grouping_cols_comma = grouping_cols + ',' if grouping_cols else '' |
| distribution = ('' if is_platform_pg() else |
| "DISTRIBUTED BY ({0}{1})".format( |
| grouping_cols_comma, dest)) |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table)) |
| plpy.execute(""" |
| CREATE TEMP TABLE {edge_temp_table} AS |
| SELECT * FROM {edge_table} |
| {distribution} |
| """.format(**locals())) |
| |
| # GPDB has "distributed by" clauses to help with indexing. |
| # For Postgres we add the index explicitly. |
| if is_platform_pg(): |
| plpy.execute("CREATE INDEX ON {0}({1})".format( |
| edge_temp_table, src)) |
| |
| # Intermediate tables required. |
| cur = unique_string(desp='cur') |
| message = unique_string(desp='message') |
| cur_unconv = unique_string(desp='cur_unconv') |
| message_unconv = unique_string(desp='message_unconv') |
| out_cnts = unique_string(desp='out_cnts') |
| out_cnts_cnt = unique_string(desp='cnt') |
| v1 = unique_string(desp='v1') |
| |
| if is_platform_pg(): |
| cur_distribution = cnts_distribution = '' |
| else: |
| cur_distribution = cnts_distribution = "DISTRIBUTED BY ({0}{1})".format( |
| grouping_cols_comma, vertex_id) |
| cur_join_clause = """{edge_temp_table}.{dest} = {cur}.{vertex_id} |
| """.format(**locals()) |
| out_cnts_join_clause = """{out_cnts}.{vertex_id} = |
| {edge_temp_table}.{src} """.format(**locals()) |
| v1_join_clause = """{v1}.{vertex_id} = {edge_temp_table}.{src} |
| """.format(**locals()) |
| |
| # Get query params for Personalized Page Rank. |
| ppr_params = '' |
| total_ppr_nodes = 0 |
| random_jump_prob_ppr = 0 |
| ppr_init_value_clause = '' |
| if personalization_vertices: |
| ppr_params = get_query_params_for_ppr(personalization_vertices, damping_factor, |
| vertex_id, edge_temp_table, vertex_table, edge_params) |
| total_ppr_nodes = ppr_params[0] |
| random_jump_prob_ppr = ppr_params[1] |
| ppr_init_value_clause = ppr_params[2] |
| |
| random_probability = (1.0 - damping_factor) / n_vertices |
| if total_ppr_nodes > 0: |
| random_jump_prob = random_jump_prob_ppr |
| else: |
| random_jump_prob = random_probability |
| |
| #################################################################### |
| # Create several strings that will be used to construct required |
| # queries. These strings will be required only during grouping. |
| |
| ignore_group_clause_first = '' |
| limit = ' LIMIT 1 ' |
| |
| grouping_cols_select_pr = '' |
| vertices_per_group_inner_join_pr = '' |
| ignore_group_clause_pr = '' |
| |
| grouping_cols_select_ins = '' |
| vpg_from_clause_ins = '' |
| vpg_where_clause_ins = '' |
| message_grp_where_ins = '' |
| ignore_group_clause_ins = '' |
| |
| nodes_with_no_incoming_edges = unique_string(desp='no_incoming') |
| ignore_group_clause_ins_noincoming = '' |
| |
| grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id) |
| group_by_grouping_cols_conv = '' |
| message_grp_clause_conv = '' |
| ignore_group_clause_conv = '' |
| |
| #################################################################### |
| |
| # Queries when groups are involved need a lot more conditions in |
| # various clauses, so populating the required variables. Some intermediate |
| # tables are unnecessary when no grouping is involved, so create some |
| # tables and certain columns only during grouping. |
| if grouping_cols: |
| distinct_grp_table = unique_string(desp='grp') |
| plpy.execute( |
| "DROP TABLE IF EXISTS {0}".format(distinct_grp_table)) |
| plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS |
| SELECT DISTINCT {grouping_cols} FROM {edge_temp_table} |
| """.format(**locals())) |
| vertices_per_group = unique_string(desp='nvert_grp') |
| init_pr = unique_string(desp='init') |
| random_prob = unique_string(desp='rand') |
| subq = unique_string(desp='subquery') |
| rand_damp = 1 - damping_factor |
| grouping_where_clause = _check_groups( |
| distinct_grp_table, subq, grouping_cols_list) |
| group_by_clause = get_table_qualified_col_str( |
| distinct_grp_table, grouping_cols_list) |
| # Find number of vertices in each group, this is the normalizing |
| # factor for computing the random_prob |
| where_clause_ppr = '' |
| if personalization_vertices: |
| personalization_vertices_str = ','.join( |
| [str(i) for i in personalization_vertices]) |
| where_clause_ppr = """ |
| where __vertices__ = ANY(ARRAY[{personalization_vertices_str}]) |
| """.format(**locals()) |
| random_prob_grp = 1.0 - damping_factor |
| init_prob_grp = 1.0 / total_ppr_nodes |
| else: |
| personalization_vertices_str = '' |
| random_prob_grp = """ |
| {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION |
| """.format(**locals()) |
| init_prob_grp = """ |
| 1/COUNT(__vertices__)::DOUBLE PRECISION |
| """.format(**locals()) |
| |
| plpy.execute( |
| "DROP TABLE IF EXISTS {0}".format(vertices_per_group)) |
| plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS |
| SELECT {distinct_grp_table}.*, |
| {init_prob_grp} AS {init_pr}, |
| {random_prob_grp} as {random_prob} |
| FROM {distinct_grp_table} INNER JOIN ( |
| SELECT {grouping_cols}, {src} AS __vertices__ |
| FROM {edge_temp_table} |
| UNION |
| SELECT {grouping_cols}, {dest} FROM {edge_temp_table} |
| ){subq} |
| ON {grouping_where_clause} |
| {where_clause_ppr} |
| GROUP BY {group_by_clause} |
| """.format(**locals())) |
| |
| grouping_where_clause = _check_groups( |
| vertices_per_group, subq, grouping_cols_list) |
| group_by_clause = get_table_qualified_col_str( |
| vertices_per_group, grouping_cols_list) |
| |
| if personalization_vertices: |
| init_prob_grp_ppr = 1.0 / total_ppr_nodes |
| init_pr = """ |
| CASE when __vertices__ = ANY(ARRAY[{personalization_vertices_str}]) |
| THEN {init_prob_grp_ppr} ELSE 0 END |
| """.format(**locals()) |
| |
| plpy.execute(""" |
| CREATE TEMP TABLE {cur} AS |
| SELECT {group_by_clause}, {subq}.__vertices__ |
| AS {vertex_id}, {init_pr} AS pagerank |
| FROM {vertices_per_group} INNER JOIN ( |
| SELECT {grouping_cols}, {src} AS __vertices__ |
| FROM {edge_temp_table} |
| UNION |
| SELECT {grouping_cols}, {dest} FROM {edge_temp_table} |
| ){subq} |
| ON {grouping_where_clause} |
| {cur_distribution} |
| """.format(**locals())) |
| vpg = unique_string(desp='vpg') |
| |
| # Compute the out-degree of every node in the group-based |
| # subgraphs. |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts)) |
| plpy.execute("""CREATE TEMP TABLE {out_cnts} AS |
| SELECT {grouping_cols_select} {src} AS {vertex_id}, |
| COUNT({dest}) AS {out_cnts_cnt} |
| FROM {edge_temp_table} |
| GROUP BY {grouping_cols_select} {src} |
| {cnts_distribution} |
| """.format(grouping_cols_select=grouping_cols + ',' |
| if grouping_cols else '', **locals())) |
| |
| message_grp = _check_groups(cur, message, grouping_cols_list) |
| cur_join_clause = cur_join_clause + ' AND ' + _check_groups( |
| cur, edge_temp_table, grouping_cols_list) |
| |
| out_cnts_join_clause = out_cnts_join_clause + ' AND ' \ |
| + _check_groups(out_cnts, edge_temp_table, |
| grouping_cols_list) |
| |
| v1_join_clause = v1_join_clause + ' AND ' + _check_groups( |
| v1, edge_temp_table, grouping_cols_list) |
| |
| vpg_join_clause = _check_groups( |
| vpg, edge_temp_table, grouping_cols_list) |
| vpg_t1_join_clause = _check_groups( |
| vpg, '__t1__', grouping_cols_list) |
| |
| # join clause specific to populating random_prob for nodes without any |
| # incoming edges. |
| edge_grouping_cols_select = get_table_qualified_col_str( |
| edge_temp_table, grouping_cols_list) |
| cur_grouping_cols_select = get_table_qualified_col_str( |
| cur, grouping_cols_list) |
| |
| # Create output summary table: |
| cols_names_types = get_cols_and_types(edge_table) |
| grouping_cols_clause = ', '.join([c_name + " " + c_type |
| for (c_name, c_type) |
| in cols_names_types |
| if c_name in grouping_cols_list]) |
| plpy.execute(""" |
| CREATE TABLE {summary_table} ( |
| {grouping_cols_clause}, |
| __iterations__ INTEGER |
| ) |
| """.format(**locals())) |
| # Create output table. This will be updated whenever a group converges |
| # Note that vertex_id is assumed to be an integer (as described in |
| # documentation) |
| plpy.execute(""" |
| CREATE TABLE {out_table} ( |
| {grouping_cols_clause}, |
| {vertex_id} BIGINT, |
| pagerank DOUBLE PRECISION |
| ) |
| """.format(**locals())) |
| temp_summary_table = unique_string(desp='temp_summary') |
| plpy.execute( |
| "DROP TABLE IF EXISTS {0}".format(temp_summary_table)) |
| plpy.execute(""" |
| CREATE TABLE {temp_summary_table} ( |
| {grouping_cols_clause} |
| ) |
| """.format(**locals())) |
| ################################################################ |
| # Strings required for the main PageRank computation query |
| grouping_cols_select_pr = edge_grouping_cols_select + ', ' |
| |
| if personalization_vertices: |
| random_jump_prob = random_jump_prob_ppr |
| else: |
| random_jump_prob = 'MIN({vpg}.{random_prob})'.format( |
| **locals()) |
| vertices_per_group_inner_join_pr = """ |
| INNER JOIN {vertices_per_group} |
| AS {vpg} ON {vpg_join_clause}""".format(**locals()) |
| |
| ignore_group_clause_pr = ' WHERE ' + \ |
| get_filtered_cols_subquery_str(edge_temp_table, |
| summary_table, |
| grouping_cols_list) |
| |
| ignore_group_clause_ins_noincoming = ' WHERE ' + \ |
| get_filtered_cols_subquery_str(nodes_with_no_incoming_edges, |
| summary_table, |
| grouping_cols_list) |
| # Strings required for updating PageRank scores of vertices that have |
| # no incoming edges |
| grouping_cols_select_ins = cur_grouping_cols_select + ',' |
| vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format( |
| **locals()) |
| vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format( |
| **locals()) |
| message_grp_where_ins = 'WHERE {message_grp}'.format(**locals()) |
| ignore_group_clause_ins = ' AND ' + get_filtered_cols_subquery_str( |
| cur, summary_table, grouping_cols_list) |
| # Strings required for convergence test query |
| grouping_cols_select_conv = cur_grouping_cols_select |
| group_by_grouping_cols_conv = ' GROUP BY {0}'.format( |
| cur_grouping_cols_select) |
| message_grp_clause_conv = '{0} AND '.format(message_grp) |
| ignore_group_clause_conv = ' AND ' + get_filtered_cols_subquery_str( |
| cur, summary_table, grouping_cols_list) |
| limit = '' |
| |
| # Find all nodes, in each group, that have no incoming edges. The PageRank |
| # value of these nodes are not updated using the first query in the |
| # following for loop. They must be explicitly plugged back in to the |
| # message table, with their corresponding group's random_prob as their |
| # PageRank values. |
| plpy.execute(""" |
| CREATE TABLE {nodes_with_no_incoming_edges} AS |
| SELECT {select_group_cols}, __t1__.{src} AS {vertex_id}, |
| {vpg}.{random_prob} AS pagerank |
| FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins} |
| WHERE NOT EXISTS ( |
| SELECT 1 |
| FROM {edge_temp_table} AS __t2__ |
| WHERE __t1__.{src}=__t2__.{dest} |
| AND {where_group_clause} |
| ) {vpg_where_clause_ins} |
| GROUP BY {select_group_cols}, __t1__.{src}, pagerank |
| """.format( |
| select_group_cols=get_table_qualified_col_str( |
| '__t1__', grouping_cols_list), |
| where_group_clause=_check_groups( |
| '__t1__', '__t2__', grouping_cols_list), |
| **locals())) |
| |
| else: |
| # cur and out_cnts tables can be simpler when no grouping is |
| # involved. |
| if total_ppr_nodes > 0: |
| init_value = ppr_init_value_clause |
| else: |
| init_value = 1.0 / n_vertices |
| plpy.execute(""" |
| CREATE TEMP TABLE {cur} AS |
| SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank |
| FROM {vertex_table} |
| {cur_distribution} |
| """.format(**locals())) |
| |
| # Compute the out-degree of every node in the graph. |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts)) |
| plpy.execute("""CREATE TEMP TABLE {out_cnts} AS |
| SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt} |
| FROM {edge_temp_table} |
| GROUP BY {src} |
| {cnts_distribution} |
| """.format(**locals())) |
| # The summary table when there is no grouping will contain only |
| # the iteration column. We don't need to create the out_table |
| # when no grouping is used since the 'cur' table will be renamed |
| # to out_table after pagerank computation is completed. |
| plpy.execute(""" |
| CREATE TABLE {summary_table} ( |
| __iterations__ INTEGER |
| ) |
| """.format(**locals())) |
| # Find all nodes in the graph that don't have any incoming edges and |
| # assign random_prob as their pagerank values. |
| plpy.execute(""" |
| CREATE TABLE {nodes_with_no_incoming_edges} AS |
| SELECT DISTINCT({src}), {random_probability} AS pagerank |
| FROM {edge_temp_table} |
| EXCEPT |
| (SELECT DISTINCT({dest}), {random_probability} AS pagerank |
| FROM {edge_temp_table}) |
| """.format(**locals())) |
| unconverged = 0 |
| iteration_num = 0 |
| |
| for iteration_num in range(max_iter): |
| ################################################################ |
| # PageRank for node 'A' at any given iteration 'i' is given by: |
| # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) + |
| # PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N |
| # where 'N' is the number of vertices in the graph, |
| # B, C are nodes that have edges to node A, and |
| # degree(node) represents the number of outgoing edges from 'node' |
| ################################################################ |
| # Essentially, the pagerank for a node is based on an aggregate of a |
| # fraction of the pagerank values of all the nodes that have incoming |
| # edges to it, along with a small random probability. |
| # More information can be found at: |
| # https://en.wikipedia.org/wiki/PageRank#Damping_factor |
| |
| # The query below computes the PageRank of each node using the above |
| # formula. A small explanatory note on ignore_group_clause: |
| # This is used only when grouping is set. This essentially will have |
| # the condition that will help skip the PageRank computation on groups |
| # that have converged. |
| |
| plpy.execute(""" |
| CREATE TABLE {message} AS |
| SELECT {grouping_cols_select_pr} |
| {edge_temp_table}.{dest} AS {vertex_id}, |
| SUM(({v1}.pagerank)/{out_cnts}.{out_cnts_cnt})*{damping_factor}+ |
| {random_jump_prob} AS pagerank |
| FROM {edge_temp_table} |
| INNER JOIN {cur} ON {cur_join_clause} |
| INNER JOIN {out_cnts} ON {out_cnts_join_clause} |
| INNER JOIN {cur} AS {v1} ON {v1_join_clause} |
| {vertices_per_group_inner_join_pr} |
| {ignore_group_clause} |
| GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest} |
| {cur_distribution} |
| """.format(ignore_group_clause=ignore_group_clause_pr |
| if iteration_num > 0 else ignore_group_clause_first, |
| **locals())) |
| |
| # If there are nodes that have no incoming edges, they are not |
| # captured in the message table. Insert entries for such nodes, |
| # with random_prob. |
| plpy.execute(""" |
| INSERT INTO {message} |
| SELECT * |
| FROM {nodes_with_no_incoming_edges} |
| {ignore_group_clause} |
| """.format(ignore_group_clause=ignore_group_clause_ins_noincoming |
| if iteration_num > 0 else ignore_group_clause_first, |
| **locals())) |
| # Check for convergence: |
| # Check for convergence only if threshold != 0. |
| if threshold != 0: |
| # message_unconv and cur_unconv will contain the unconverged groups |
| # after current # and previous iterations respectively. Groups that |
| # are missing in message_unconv but appear in cur_unconv are the |
| # groups that have converged after this iteration's computations. |
| # If no grouping columns are specified, then we check if there is |
| # at least one unconverged node (limit 1 is used in the |
| # query). |
| if iteration_num == 0 and grouping_cols: |
| # Hack to address corner case: |
| # With grouping, if there was a graph that converged in |
| # the very first iteration (a complete graph is an eg. |
| # of such a graph), then the pagerank scores for that |
| # group was not showing up in the output. The following |
| # code just prevents convergence in the first iteration. |
| plpy.execute(""" |
| CREATE TEMP TABLE {message_unconv} AS |
| SELECT * FROM {distinct_grp_table} |
| """.format(**locals())) |
| else: |
| plpy.execute(""" |
| CREATE TEMP TABLE {message_unconv} AS |
| SELECT {grouping_cols_select_conv} |
| FROM {message} |
| INNER JOIN {cur} |
| ON {cur}.{vertex_id}={message}.{vertex_id} |
| WHERE {message_grp_clause_conv} |
| ABS({cur}.pagerank-{message}.pagerank) > {threshold} |
| {ignore_group_clause} |
| {group_by_grouping_cols_conv} |
| {limit} |
| """.format(ignore_group_clause=ignore_group_clause_ins |
| if iteration_num > 0 else ignore_group_clause_conv, |
| **locals())) |
| |
| unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0} |
| """.format(message_unconv))[0]["cnt"] |
| if iteration_num > 0 and grouping_cols: |
| # Update result and summary tables for groups that have |
| # converged |
| # since the last iteration. |
| update_output_grouping_tables_for_link_analysis( |
| temp_summary_table, |
| iteration_num, |
| summary_table, |
| out_table, |
| message, |
| grouping_cols_list, |
| cur_unconv, |
| message_unconv) |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv)) |
| plpy.execute("""ALTER TABLE {message_unconv} RENAME TO |
| {cur_unconv} """.format(**locals())) |
| else: |
| # Do not run convergence test if threshold=0, since that implies |
| # the user wants to run max_iter iterations. |
| unconverged = 1 |
| |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(cur)) |
| plpy.execute("""ALTER TABLE {message} RENAME TO {cur} |
| """.format(**locals())) |
| if unconverged == 0: |
| break |
| |
| # If there still are some unconverged groups/(entire table), |
| # update results. |
| if grouping_cols: |
| if unconverged > 0: |
| if threshold != 0: |
| # We completed max_iters, but there are still some unconverged |
| # groups # Update the result and summary tables for unconverged |
| # groups. |
| update_output_grouping_tables_for_link_analysis( |
| temp_summary_table, |
| iteration_num, |
| summary_table, |
| out_table, |
| cur, |
| grouping_cols_list, |
| cur_unconv) |
| else: |
| # No group has converged. List of all group values are in |
| # distinct_grp_table. |
| update_output_grouping_tables_for_link_analysis( |
| temp_summary_table, |
| iteration_num, |
| summary_table, |
| out_table, cur, |
| grouping_cols_list, |
| distinct_grp_table) |
| |
| # updating the calculated pagerank value in case of |
| # Personalized Page Rank. |
| # Ref : |
| # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html |
| if total_ppr_nodes > 1: |
| plpy.execute("""UPDATE {out_table} set pagerank = |
| pagerank / {total_ppr_nodes}::DOUBLE PRECISION |
| """.format(out_table=out_table, |
| total_ppr_nodes=total_ppr_nodes)) |
| else: |
| # updating the calculated pagerank value in case of |
| # Personalized Page Rank. |
| # Ref : |
| # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html |
| if total_ppr_nodes > 1: |
| plpy.execute("""UPDATE {table_name} set pagerank = |
| pagerank / {total_ppr_nodes}::DOUBLE PRECISION |
| """.format(table_name=cur, |
| total_ppr_nodes=total_ppr_nodes)) |
| |
| rename_table(schema_madlib, cur, out_table) |
| plpy.execute(""" |
| INSERT INTO {summary_table} VALUES |
| ({iteration_num}+1) |
| """.format(**locals())) |
| |
| # Step 4: Cleanup |
| plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6} |
| """.format(out_cnts, edge_temp_table, cur, message, cur_unconv, |
| message_unconv, nodes_with_no_incoming_edges)) |
| if grouping_cols: |
| plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2} |
| """.format(vertices_per_group, temp_summary_table, |
| distinct_grp_table)) |
| |
| |
| def get_query_params_for_ppr(personalization_vertices, damping_factor, |
| vertex_id, edge_temp_table, vertex_table, edge_params): |
| """ |
| This function will prepare the Join Clause and the condition to Calculate |
| the Personalized Page Rank and Returns Total number of user provided |
| personalization vertices, A join Clause and a clause to be added to existing |
| formula to calculate pagerank. |
| |
| Args: |
| @param personalization_vertices |
| @param damping_factor |
| @param vertex_id |
| @param edge_temp_table |
| @param vertex_table |
| |
| Returns : |
| (Integer, String, String) |
| |
| """ |
| total_ppr_nodes = 0 |
| ppr_random_prob_clause = '' |
| ppr_init_prob_clause = '' |
| |
| if personalization_vertices: |
| total_ppr_nodes = len(personalization_vertices) |
| ppr_init_value = 1.0 / total_ppr_nodes |
| prob_value = 1.0 - damping_factor |
| dest = edge_params["dest"] |
| |
| personalization_vertices_str = ','.join([str(i) for i in personalization_vertices]) |
| |
| # In case of PPR, Assign the Random jump probability to the personalization_vertices only. |
| # For rest of the nodes, Random jump probability will be zero. |
| ppr_random_prob_clause = """ |
| CASE WHEN {edge_temp_table}.{dest} = ANY(ARRAY[{personalization_vertices_str}]) |
| THEN {prob_value} ELSE 0 END |
| """.format(**locals()) |
| |
| ppr_init_prob_clause = """ |
| CASE WHEN {vertex_id} = ANY(ARRAY[{personalization_vertices_str}]) |
| THEN {ppr_init_value} ELSE 0 END |
| """.format(**locals()) |
| return(total_ppr_nodes, ppr_random_prob_clause, ppr_init_prob_clause) |
| |
| |
| def pagerank_help(schema_madlib, message, **kwargs): |
| """ |
| Help function for pagerank |
| |
| Args: |
| @param schema_madlib |
| @param message: string, Help message string |
| @param kwargs |
| |
| Returns: |
| String. Help/usage information |
| """ |
| if message is not None and \ |
| message.lower() in ("usage", "help", "?"): |
| help_string = "Get from method below" |
| help_string = get_graph_usage(schema_madlib, 'PageRank', |
| """out_table TEXT, -- Name of the output table for PageRank |
| damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model |
| -- (DEFAULT = 0.85) |
| max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100) |
| threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*1000), |
| -- N is number of vertices in the graph) |
| grouping_col TEXT, -- Comma separated column names to group on |
| -- (DEFAULT = NULL, no grouping) |
| personalization_vertices ARRAY OF BIGINT, -- A comma seperated list of vertices |
| or nodes for personalized page rank. |
| """) + """ |
| |
| A summary table is also created that contains information regarding the |
| number of iterations required for convergence. It is named by adding the |
| suffix '_summary' to the 'out_table' parameter. |
| """ |
| else: |
| help_string = """ |
| ---------------------------------------------------------------------------- |
| SUMMARY |
| ---------------------------------------------------------------------------- |
| Given a directed graph, pagerank algorithm finds the PageRank score of all |
| the vertices in the graph. |
| -- |
| For an overview on usage, run: |
| SELECT {schema_madlib}.pagerank('usage'); |
| """ |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # --------------------------------------------------------------------- |