blob: 830432b5b4814e9556ecd754a375f3eef8f48569 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# PageRank
# Please refer to the pagerank.sql_in file for the documentation
"""
@file pagerank.py_in
@namespace graph
"""
import plpy
from graph_utils import get_graph_usage
from graph_utils import get_default_threshold_for_link_analysis
from graph_utils import update_output_grouping_tables_for_link_analysis
from graph_utils import validate_graph_coding
from graph_utils import validate_output_and_summary_tables
from graph_utils import validate_params_for_link_analysis
from utilities.control import MinWarning
from utilities.control import OptimizerControl
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import get_filtered_cols_subquery_str
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
from utilities.utilities import is_platform_pg
from utilities.utilities import py_list_to_sql_string
from utilities.validate_args import columns_exist_in_table, get_cols_and_types
from utilities.validate_args import table_exists
from utilities.utilities import rename_table
def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, damping_factor, max_iter,
threshold, grouping_cols_list, personalization_vertices):
"""
Function to validate input parameters for PageRank
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
out_table, 'PageRank')
# Validate args such as threshold and max_iter
validate_params_for_link_analysis(schema_madlib, "PageRank",
threshold, max_iter,
edge_table, grouping_cols_list)
_assert(damping_factor >= 0.0 and damping_factor <= 1.0,
"PageRank: Invalid damping factor value ({0}), must be between 0 and 1.".
format(damping_factor))
# Validate against the given set of nodes for Personalized Page Rank
if personalization_vertices:
grouping_cols = get_table_qualified_col_str(
edge_table, grouping_cols_list)
group_by_clause = "GROUP BY {0}".format(grouping_cols) \
if grouping_cols_list else ''
src = edge_params["src"]
dest = edge_params["dest"]
input_personalization_vertices_length = len(personalization_vertices)
personalization_vertices_str = ','.join([str(i) for i in personalization_vertices])
# Get a list which has the number of personalization nodes of each group
vertices_count_list_by_group = plpy.execute("""
SELECT count(distinct {vertex_id}) AS count
FROM {vertex_table}
RIGHT JOIN {edge_table}
ON ({vertex_table}.{vertex_id} = {edge_table}.{src}
OR {vertex_table}.{vertex_id} = {edge_table}.{dest})
AND {vertex_table}.{vertex_id} = ANY(ARRAY[{personalization_vertices_str}])
{group_by_clause}
""".format(**locals()))
# The number of personalization nodes for every group should be equal to
# the number given by input personalization_vertices list. Otherwise,
# some nodes are missing for certain group. Or there might be duplicate
# nodes in input personalization_vertices list. Or there are some
# invalid nodes in input list that don't exist in vertex table. In any
# case, throw an error.
for key in vertices_count_list_by_group:
if key["count"] != input_personalization_vertices_length:
plpy.error("Personalization nodes must be a subset "
"of the vertex_table without duplicates and "
"every nodes should be present in all the groups")
def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_table,
damping_factor, max_iter, threshold, grouping_cols, personalization_vertices, **kwargs):
"""
Function that computes the PageRank
Args:
@param vertex_table
@param vertex_id
@param edge_table
@param source_vertex
@param dest_vertex
@param out_table
@param damping_factor
@param max_iter
@param threshold
@param personalization_vertices
"""
# We noticed the current implementation is noticeably slower when orca=ON
# on Greenplum 5, so that we temporarily turn if off to run pagerank.
with OptimizerControl(False):
with MinWarning('warning'):
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
edge_params = extract_keyvalue_params(
edge_args, params_types, default_args)
# populate default values for optional params if null
if damping_factor is None:
damping_factor = 0.85
if max_iter is None:
max_iter = 100
if not vertex_id:
vertex_id = "id"
if not grouping_cols:
grouping_cols = ''
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, damping_factor,
max_iter, threshold, grouping_cols_list,
personalization_vertices)
summary_table = add_postfix(out_table, "_summary")
_assert(not table_exists(summary_table),
"Graph PageRank: Output summary table ({summary_table}) already exists."
.format(**locals()))
src = edge_params["src"]
dest = edge_params["dest"]
n_vertices = plpy.execute("""
SELECT COUNT({0}) AS cnt
FROM {1}
""".format(vertex_id, vertex_table))[0]["cnt"]
if threshold is None:
threshold = get_default_threshold_for_link_analysis(n_vertices)
# table/column names used when grouping_cols is set.
distinct_grp_table = ''
vertices_per_group = ''
vpg = ''
grouping_where_clause = ''
group_by_clause = ''
random_prob = ''
ppr_join_clause = ''
edge_temp_table = unique_string(desp='temp_edge')
grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
distribution = ('' if is_platform_pg() else
"DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, dest))
plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
plpy.execute("""
CREATE TEMP TABLE {edge_temp_table} AS
SELECT * FROM {edge_table}
{distribution}
""".format(**locals()))
# GPDB has "distributed by" clauses to help with indexing.
# For Postgres we add the index explicitly.
if is_platform_pg():
plpy.execute("CREATE INDEX ON {0}({1})".format(
edge_temp_table, src))
# Intermediate tables required.
cur = unique_string(desp='cur')
message = unique_string(desp='message')
cur_unconv = unique_string(desp='cur_unconv')
message_unconv = unique_string(desp='message_unconv')
out_cnts = unique_string(desp='out_cnts')
out_cnts_cnt = unique_string(desp='cnt')
v1 = unique_string(desp='v1')
if is_platform_pg():
cur_distribution = cnts_distribution = ''
else:
cur_distribution = cnts_distribution = "DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, vertex_id)
cur_join_clause = """{edge_temp_table}.{dest} = {cur}.{vertex_id}
""".format(**locals())
out_cnts_join_clause = """{out_cnts}.{vertex_id} =
{edge_temp_table}.{src} """.format(**locals())
v1_join_clause = """{v1}.{vertex_id} = {edge_temp_table}.{src}
""".format(**locals())
# Get query params for Personalized Page Rank.
ppr_params = ''
total_ppr_nodes = 0
random_jump_prob_ppr = 0
ppr_init_value_clause = ''
if personalization_vertices:
ppr_params = get_query_params_for_ppr(personalization_vertices, damping_factor,
vertex_id, edge_temp_table, vertex_table, edge_params)
total_ppr_nodes = ppr_params[0]
random_jump_prob_ppr = ppr_params[1]
ppr_init_value_clause = ppr_params[2]
random_probability = (1.0 - damping_factor) / n_vertices
if total_ppr_nodes > 0:
random_jump_prob = random_jump_prob_ppr
else:
random_jump_prob = random_probability
####################################################################
# Create several strings that will be used to construct required
# queries. These strings will be required only during grouping.
ignore_group_clause_first = ''
limit = ' LIMIT 1 '
grouping_cols_select_pr = ''
vertices_per_group_inner_join_pr = ''
ignore_group_clause_pr = ''
grouping_cols_select_ins = ''
vpg_from_clause_ins = ''
vpg_where_clause_ins = ''
message_grp_where_ins = ''
ignore_group_clause_ins = ''
nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
ignore_group_clause_ins_noincoming = ''
grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
group_by_grouping_cols_conv = ''
message_grp_clause_conv = ''
ignore_group_clause_conv = ''
####################################################################
# Queries when groups are involved need a lot more conditions in
# various clauses, so populating the required variables. Some intermediate
# tables are unnecessary when no grouping is involved, so create some
# tables and certain columns only during grouping.
if grouping_cols:
distinct_grp_table = unique_string(desp='grp')
plpy.execute(
"DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
""".format(**locals()))
vertices_per_group = unique_string(desp='nvert_grp')
init_pr = unique_string(desp='init')
random_prob = unique_string(desp='rand')
subq = unique_string(desp='subquery')
rand_damp = 1 - damping_factor
grouping_where_clause = _check_groups(
distinct_grp_table, subq, grouping_cols_list)
group_by_clause = get_table_qualified_col_str(
distinct_grp_table, grouping_cols_list)
# Find number of vertices in each group, this is the normalizing
# factor for computing the random_prob
where_clause_ppr = ''
if personalization_vertices:
personalization_vertices_str = ','.join(
[str(i) for i in personalization_vertices])
where_clause_ppr = """
where __vertices__ = ANY(ARRAY[{personalization_vertices_str}])
""".format(**locals())
random_prob_grp = 1.0 - damping_factor
init_prob_grp = 1.0 / total_ppr_nodes
else:
personalization_vertices_str = ''
random_prob_grp = """
{rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION
""".format(**locals())
init_prob_grp = """
1/COUNT(__vertices__)::DOUBLE PRECISION
""".format(**locals())
plpy.execute(
"DROP TABLE IF EXISTS {0}".format(vertices_per_group))
plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
SELECT {distinct_grp_table}.*,
{init_prob_grp} AS {init_pr},
{random_prob_grp} as {random_prob}
FROM {distinct_grp_table} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_temp_table}
UNION
SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
){subq}
ON {grouping_where_clause}
{where_clause_ppr}
GROUP BY {group_by_clause}
""".format(**locals()))
grouping_where_clause = _check_groups(
vertices_per_group, subq, grouping_cols_list)
group_by_clause = get_table_qualified_col_str(
vertices_per_group, grouping_cols_list)
if personalization_vertices:
init_prob_grp_ppr = 1.0 / total_ppr_nodes
init_pr = """
CASE when __vertices__ = ANY(ARRAY[{personalization_vertices_str}])
THEN {init_prob_grp_ppr} ELSE 0 END
""".format(**locals())
plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {group_by_clause}, {subq}.__vertices__
AS {vertex_id}, {init_pr} AS pagerank
FROM {vertices_per_group} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_temp_table}
UNION
SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
){subq}
ON {grouping_where_clause}
{cur_distribution}
""".format(**locals()))
vpg = unique_string(desp='vpg')
# Compute the out-degree of every node in the group-based
# subgraphs.
plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
SELECT {grouping_cols_select} {src} AS {vertex_id},
COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_temp_table}
GROUP BY {grouping_cols_select} {src}
{cnts_distribution}
""".format(grouping_cols_select=grouping_cols + ','
if grouping_cols else '', **locals()))
message_grp = _check_groups(cur, message, grouping_cols_list)
cur_join_clause = cur_join_clause + ' AND ' + _check_groups(
cur, edge_temp_table, grouping_cols_list)
out_cnts_join_clause = out_cnts_join_clause + ' AND ' \
+ _check_groups(out_cnts, edge_temp_table,
grouping_cols_list)
v1_join_clause = v1_join_clause + ' AND ' + _check_groups(
v1, edge_temp_table, grouping_cols_list)
vpg_join_clause = _check_groups(
vpg, edge_temp_table, grouping_cols_list)
vpg_t1_join_clause = _check_groups(
vpg, '__t1__', grouping_cols_list)
# join clause specific to populating random_prob for nodes without any
# incoming edges.
edge_grouping_cols_select = get_table_qualified_col_str(
edge_temp_table, grouping_cols_list)
cur_grouping_cols_select = get_table_qualified_col_str(
cur, grouping_cols_list)
# Create output summary table:
cols_names_types = get_cols_and_types(edge_table)
grouping_cols_clause = ', '.join([c_name + " " + c_type
for (c_name, c_type)
in cols_names_types
if c_name in grouping_cols_list])
plpy.execute("""
CREATE TABLE {summary_table} (
{grouping_cols_clause},
__iterations__ INTEGER
)
""".format(**locals()))
# Create output table. This will be updated whenever a group converges
# Note that vertex_id is assumed to be an integer (as described in
# documentation)
plpy.execute("""
CREATE TABLE {out_table} (
{grouping_cols_clause},
{vertex_id} BIGINT,
pagerank DOUBLE PRECISION
)
""".format(**locals()))
temp_summary_table = unique_string(desp='temp_summary')
plpy.execute(
"DROP TABLE IF EXISTS {0}".format(temp_summary_table))
plpy.execute("""
CREATE TABLE {temp_summary_table} (
{grouping_cols_clause}
)
""".format(**locals()))
################################################################
# Strings required for the main PageRank computation query
grouping_cols_select_pr = edge_grouping_cols_select + ', '
if personalization_vertices:
random_jump_prob = random_jump_prob_ppr
else:
random_jump_prob = 'MIN({vpg}.{random_prob})'.format(
**locals())
vertices_per_group_inner_join_pr = """
INNER JOIN {vertices_per_group}
AS {vpg} ON {vpg_join_clause}""".format(**locals())
ignore_group_clause_pr = ' WHERE ' + \
get_filtered_cols_subquery_str(edge_temp_table,
summary_table,
grouping_cols_list)
ignore_group_clause_ins_noincoming = ' WHERE ' + \
get_filtered_cols_subquery_str(nodes_with_no_incoming_edges,
summary_table,
grouping_cols_list)
# Strings required for updating PageRank scores of vertices that have
# no incoming edges
grouping_cols_select_ins = cur_grouping_cols_select + ','
vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
**locals())
vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
**locals())
message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
ignore_group_clause_ins = ' AND ' + get_filtered_cols_subquery_str(
cur, summary_table, grouping_cols_list)
# Strings required for convergence test query
grouping_cols_select_conv = cur_grouping_cols_select
group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
cur_grouping_cols_select)
message_grp_clause_conv = '{0} AND '.format(message_grp)
ignore_group_clause_conv = ' AND ' + get_filtered_cols_subquery_str(
cur, summary_table, grouping_cols_list)
limit = ''
# Find all nodes, in each group, that have no incoming edges. The PageRank
# value of these nodes are not updated using the first query in the
# following for loop. They must be explicitly plugged back in to the
# message table, with their corresponding group's random_prob as their
# PageRank values.
plpy.execute("""
CREATE TABLE {nodes_with_no_incoming_edges} AS
SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
{vpg}.{random_prob} AS pagerank
FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
WHERE NOT EXISTS (
SELECT 1
FROM {edge_temp_table} AS __t2__
WHERE __t1__.{src}=__t2__.{dest}
AND {where_group_clause}
) {vpg_where_clause_ins}
GROUP BY {select_group_cols}, __t1__.{src}, pagerank
""".format(
select_group_cols=get_table_qualified_col_str(
'__t1__', grouping_cols_list),
where_group_clause=_check_groups(
'__t1__', '__t2__', grouping_cols_list),
**locals()))
else:
# cur and out_cnts tables can be simpler when no grouping is
# involved.
if total_ppr_nodes > 0:
init_value = ppr_init_value_clause
else:
init_value = 1.0 / n_vertices
plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
FROM {vertex_table}
{cur_distribution}
""".format(**locals()))
# Compute the out-degree of every node in the graph.
plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_temp_table}
GROUP BY {src}
{cnts_distribution}
""".format(**locals()))
# The summary table when there is no grouping will contain only
# the iteration column. We don't need to create the out_table
# when no grouping is used since the 'cur' table will be renamed
# to out_table after pagerank computation is completed.
plpy.execute("""
CREATE TABLE {summary_table} (
__iterations__ INTEGER
)
""".format(**locals()))
# Find all nodes in the graph that don't have any incoming edges and
# assign random_prob as their pagerank values.
plpy.execute("""
CREATE TABLE {nodes_with_no_incoming_edges} AS
SELECT DISTINCT({src}), {random_probability} AS pagerank
FROM {edge_temp_table}
EXCEPT
(SELECT DISTINCT({dest}), {random_probability} AS pagerank
FROM {edge_temp_table})
""".format(**locals()))
unconverged = 0
iteration_num = 0
for iteration_num in range(max_iter):
################################################################
# PageRank for node 'A' at any given iteration 'i' is given by:
# PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
# PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
# where 'N' is the number of vertices in the graph,
# B, C are nodes that have edges to node A, and
# degree(node) represents the number of outgoing edges from 'node'
################################################################
# Essentially, the pagerank for a node is based on an aggregate of a
# fraction of the pagerank values of all the nodes that have incoming
# edges to it, along with a small random probability.
# More information can be found at:
# https://en.wikipedia.org/wiki/PageRank#Damping_factor
# The query below computes the PageRank of each node using the above
# formula. A small explanatory note on ignore_group_clause:
# This is used only when grouping is set. This essentially will have
# the condition that will help skip the PageRank computation on groups
# that have converged.
plpy.execute("""
CREATE TABLE {message} AS
SELECT {grouping_cols_select_pr}
{edge_temp_table}.{dest} AS {vertex_id},
SUM(({v1}.pagerank)/{out_cnts}.{out_cnts_cnt})*{damping_factor}+
{random_jump_prob} AS pagerank
FROM {edge_temp_table}
INNER JOIN {cur} ON {cur_join_clause}
INNER JOIN {out_cnts} ON {out_cnts_join_clause}
INNER JOIN {cur} AS {v1} ON {v1_join_clause}
{vertices_per_group_inner_join_pr}
{ignore_group_clause}
GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
{cur_distribution}
""".format(ignore_group_clause=ignore_group_clause_pr
if iteration_num > 0 else ignore_group_clause_first,
**locals()))
# If there are nodes that have no incoming edges, they are not
# captured in the message table. Insert entries for such nodes,
# with random_prob.
plpy.execute("""
INSERT INTO {message}
SELECT *
FROM {nodes_with_no_incoming_edges}
{ignore_group_clause}
""".format(ignore_group_clause=ignore_group_clause_ins_noincoming
if iteration_num > 0 else ignore_group_clause_first,
**locals()))
# Check for convergence:
# Check for convergence only if threshold != 0.
if threshold != 0:
# message_unconv and cur_unconv will contain the unconverged groups
# after current # and previous iterations respectively. Groups that
# are missing in message_unconv but appear in cur_unconv are the
# groups that have converged after this iteration's computations.
# If no grouping columns are specified, then we check if there is
# at least one unconverged node (limit 1 is used in the
# query).
if iteration_num == 0 and grouping_cols:
# Hack to address corner case:
# With grouping, if there was a graph that converged in
# the very first iteration (a complete graph is an eg.
# of such a graph), then the pagerank scores for that
# group was not showing up in the output. The following
# code just prevents convergence in the first iteration.
plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT * FROM {distinct_grp_table}
""".format(**locals()))
else:
plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT {grouping_cols_select_conv}
FROM {message}
INNER JOIN {cur}
ON {cur}.{vertex_id}={message}.{vertex_id}
WHERE {message_grp_clause_conv}
ABS({cur}.pagerank-{message}.pagerank) > {threshold}
{ignore_group_clause}
{group_by_grouping_cols_conv}
{limit}
""".format(ignore_group_clause=ignore_group_clause_ins
if iteration_num > 0 else ignore_group_clause_conv,
**locals()))
unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
""".format(message_unconv))[0]["cnt"]
if iteration_num > 0 and grouping_cols:
# Update result and summary tables for groups that have
# converged
# since the last iteration.
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table,
message,
grouping_cols_list,
cur_unconv,
message_unconv)
plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
{cur_unconv} """.format(**locals()))
else:
# Do not run convergence test if threshold=0, since that implies
# the user wants to run max_iter iterations.
unconverged = 1
plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
""".format(**locals()))
if unconverged == 0:
break
# If there still are some unconverged groups/(entire table),
# update results.
if grouping_cols:
if unconverged > 0:
if threshold != 0:
# We completed max_iters, but there are still some unconverged
# groups # Update the result and summary tables for unconverged
# groups.
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table,
cur,
grouping_cols_list,
cur_unconv)
else:
# No group has converged. List of all group values are in
# distinct_grp_table.
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table, cur,
grouping_cols_list,
distinct_grp_table)
# updating the calculated pagerank value in case of
# Personalized Page Rank.
# Ref :
# https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
if total_ppr_nodes > 1:
plpy.execute("""UPDATE {out_table} set pagerank =
pagerank / {total_ppr_nodes}::DOUBLE PRECISION
""".format(out_table=out_table,
total_ppr_nodes=total_ppr_nodes))
else:
# updating the calculated pagerank value in case of
# Personalized Page Rank.
# Ref :
# https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
if total_ppr_nodes > 1:
plpy.execute("""UPDATE {table_name} set pagerank =
pagerank / {total_ppr_nodes}::DOUBLE PRECISION
""".format(table_name=cur,
total_ppr_nodes=total_ppr_nodes))
rename_table(schema_madlib, cur, out_table)
plpy.execute("""
INSERT INTO {summary_table} VALUES
({iteration_num}+1)
""".format(**locals()))
# Step 4: Cleanup
plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6}
""".format(out_cnts, edge_temp_table, cur, message, cur_unconv,
message_unconv, nodes_with_no_incoming_edges))
if grouping_cols:
plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}
""".format(vertices_per_group, temp_summary_table,
distinct_grp_table))
def get_query_params_for_ppr(personalization_vertices, damping_factor,
vertex_id, edge_temp_table, vertex_table, edge_params):
"""
This function will prepare the Join Clause and the condition to Calculate
the Personalized Page Rank and Returns Total number of user provided
personalization vertices, A join Clause and a clause to be added to existing
formula to calculate pagerank.
Args:
@param personalization_vertices
@param damping_factor
@param vertex_id
@param edge_temp_table
@param vertex_table
Returns :
(Integer, String, String)
"""
total_ppr_nodes = 0
ppr_random_prob_clause = ''
ppr_init_prob_clause = ''
if personalization_vertices:
total_ppr_nodes = len(personalization_vertices)
ppr_init_value = 1.0 / total_ppr_nodes
prob_value = 1.0 - damping_factor
dest = edge_params["dest"]
personalization_vertices_str = ','.join([str(i) for i in personalization_vertices])
# In case of PPR, Assign the Random jump probability to the personalization_vertices only.
# For rest of the nodes, Random jump probability will be zero.
ppr_random_prob_clause = """
CASE WHEN {edge_temp_table}.{dest} = ANY(ARRAY[{personalization_vertices_str}])
THEN {prob_value} ELSE 0 END
""".format(**locals())
ppr_init_prob_clause = """
CASE WHEN {vertex_id} = ANY(ARRAY[{personalization_vertices_str}])
THEN {ppr_init_value} ELSE 0 END
""".format(**locals())
return(total_ppr_nodes, ppr_random_prob_clause, ppr_init_prob_clause)
def pagerank_help(schema_madlib, message, **kwargs):
"""
Help function for pagerank
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(schema_madlib, 'PageRank',
"""out_table TEXT, -- Name of the output table for PageRank
damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
-- (DEFAULT = 0.85)
max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100)
threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*1000),
-- N is number of vertices in the graph)
grouping_col TEXT, -- Comma separated column names to group on
-- (DEFAULT = NULL, no grouping)
personalization_vertices ARRAY OF BIGINT, -- A comma seperated list of vertices
or nodes for personalized page rank.
""") + """
A summary table is also created that contains information regarding the
number of iterations required for convergence. It is named by adding the
suffix '_summary' to the 'out_table' parameter.
"""
else:
help_string = """
----------------------------------------------------------------------------
SUMMARY
----------------------------------------------------------------------------
Given a directed graph, pagerank algorithm finds the PageRank score of all
the vertices in the graph.
--
For an overview on usage, run:
SELECT {schema_madlib}.pagerank('usage');
"""
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------