blob: 71cddd24ec13ba3a785306f7c349d820c61d7a1d [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# PageRank
# Please refer to the pagerank.sql_in file for the documentation
"""
@file pagerank.py_in
@namespace graph
"""
import plpy
from graph_utils import get_graph_usage
from graph_utils import get_default_threshold_for_link_analysis
from graph_utils import update_output_grouping_tables_for_link_analysis
from graph_utils import validate_graph_coding
from graph_utils import validate_output_and_summary_tables
from graph_utils import validate_params_for_link_analysis
from utilities.control import MinWarning
from utilities.control import OptimizerControl
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import get_filtered_cols_subquery_str
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
from utilities.utilities import is_platform_pg
from utilities.utilities import py_list_to_sql_string
from utilities.validate_args import columns_exist_in_table, get_cols_and_types
from utilities.validate_args import table_exists
def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, damping_factor, max_iter,
threshold, grouping_cols_list, personalization_vertices):
"""
Function to validate input parameters for PageRank
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
out_table, 'PageRank')
# Validate args such as threshold and max_iter
validate_params_for_link_analysis(schema_madlib, "PageRank",
threshold, max_iter,
edge_table, grouping_cols_list)
_assert(damping_factor >= 0.0 and damping_factor <= 1.0,
"PageRank: Invalid damping factor value ({0}), must be between 0 and 1.".
format(damping_factor))
# Validate against the given set of nodes for Personalized Page Rank
if personalization_vertices:
grouping_cols = get_table_qualified_col_str(
edge_table, grouping_cols_list)
group_by_clause = "GROUP BY {0}".format(grouping_cols) \
if grouping_cols_list else ''
src = edge_params["src"]
dest = edge_params["dest"]
# Get a list which has the number of personalization nodes of each group
vertices_count_list_by_group = plpy.execute("""
SELECT count(distinct {vertex_id}) AS count
FROM {vertex_table}
RIGHT JOIN {edge_table}
ON ({vertex_table}.{vertex_id} = {edge_table}.{src}
OR {vertex_table}.{vertex_id} = {edge_table}.{dest})
AND {vertex_table}.{vertex_id} = ANY(ARRAY{personalization_vertices})
{group_by_clause}
""".format(**locals()))
input_personalization_vertices_length = len(personalization_vertices)
# The number of personalization nodes for every group should be equal to
# the number given by input personalization_vertices list. Otherwise,
# some nodes are missing for certain group. Or there might be duplicate
# nodes in input personalization_vertices list. Or there are some
# invalid nodes in input list that don't exist in vertex table. In any
# case, throw an error.
for key in vertices_count_list_by_group:
if key["count"] != input_personalization_vertices_length:
plpy.error("Personalization nodes must be a subset "
"of the vertex_table without duplicates and "
"every nodes should be present in all the groups")
def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_table,
damping_factor, max_iter, threshold, grouping_cols, personalization_vertices, **kwargs):
"""
Function that computes the PageRank
Args:
@param vertex_table
@param vertex_id
@param edge_table
@param source_vertex
@param dest_vertex
@param out_table
@param damping_factor
@param max_iter
@param threshold
@param personalization_vertices
"""
# We noticed the current implementation is noticeably slower when orca=ON
# on Greenplum 5, so that we temporarily turn if off to run pagerank.
with OptimizerControl(False):
with MinWarning('warning'):
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
edge_params = extract_keyvalue_params(
edge_args, params_types, default_args)
# populate default values for optional params if null
if damping_factor is None:
damping_factor = 0.85
if max_iter is None:
max_iter = 100
if not vertex_id:
vertex_id = "id"
if not grouping_cols:
grouping_cols = ''
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, damping_factor,
max_iter, threshold, grouping_cols_list,
personalization_vertices)
summary_table = add_postfix(out_table, "_summary")
_assert(not table_exists(summary_table),
"Graph PageRank: Output summary table ({summary_table}) already exists."
.format(**locals()))
src = edge_params["src"]
dest = edge_params["dest"]
n_vertices = plpy.execute("""
SELECT COUNT({0}) AS cnt
FROM {1}
""".format(vertex_id, vertex_table))[0]["cnt"]
if threshold is None:
threshold = get_default_threshold_for_link_analysis(n_vertices)
# table/column names used when grouping_cols is set.
distinct_grp_table = ''
vertices_per_group = ''
vpg = ''
grouping_where_clause = ''
group_by_clause = ''
random_prob = ''
ppr_join_clause = ''
edge_temp_table = unique_string(desp='temp_edge')
grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
distribution = ('' if is_platform_pg() else
"DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, dest))
plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
plpy.execute("""
CREATE TEMP TABLE {edge_temp_table} AS
SELECT * FROM {edge_table}
{distribution}
""".format(**locals()))
# GPDB has "distributed by" clauses to help with indexing.
# For Postgres we add the index explicitly.
if is_platform_pg():
plpy.execute("CREATE INDEX ON {0}({1})".format(
edge_temp_table, src))
# Intermediate tables required.
cur = unique_string(desp='cur')
message = unique_string(desp='message')
cur_unconv = unique_string(desp='cur_unconv')
message_unconv = unique_string(desp='message_unconv')
out_cnts = unique_string(desp='out_cnts')
out_cnts_cnt = unique_string(desp='cnt')
v1 = unique_string(desp='v1')
if is_platform_pg():
cur_distribution = cnts_distribution = ''
else:
cur_distribution = cnts_distribution = "DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, vertex_id)
cur_join_clause = """{edge_temp_table}.{dest} = {cur}.{vertex_id}
""".format(**locals())
out_cnts_join_clause = """{out_cnts}.{vertex_id} =
{edge_temp_table}.{src} """.format(**locals())
v1_join_clause = """{v1}.{vertex_id} = {edge_temp_table}.{src}
""".format(**locals())
# Get query params for Personalized Page Rank.
ppr_params = ''
total_ppr_nodes = 0
random_jump_prob_ppr = 0
ppr_init_value_clause = ''
if personalization_vertices:
ppr_params = get_query_params_for_ppr(personalization_vertices, damping_factor,
vertex_id, edge_temp_table, vertex_table, edge_params)
total_ppr_nodes = ppr_params[0]
random_jump_prob_ppr = ppr_params[1]
ppr_init_value_clause = ppr_params[2]
random_probability = (1.0 - damping_factor) / n_vertices
if total_ppr_nodes > 0:
random_jump_prob = random_jump_prob_ppr
else:
random_jump_prob = random_probability
####################################################################
# Create several strings that will be used to construct required
# queries. These strings will be required only during grouping.
ignore_group_clause_first = ''
limit = ' LIMIT 1 '
grouping_cols_select_pr = ''
vertices_per_group_inner_join_pr = ''
ignore_group_clause_pr = ''
grouping_cols_select_ins = ''
vpg_from_clause_ins = ''
vpg_where_clause_ins = ''
message_grp_where_ins = ''
ignore_group_clause_ins = ''
nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
ignore_group_clause_ins_noincoming = ''
grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
group_by_grouping_cols_conv = ''
message_grp_clause_conv = ''
ignore_group_clause_conv = ''
####################################################################
# Queries when groups are involved need a lot more conditions in
# various clauses, so populating the required variables. Some intermediate
# tables are unnecessary when no grouping is involved, so create some
# tables and certain columns only during grouping.
if grouping_cols:
distinct_grp_table = unique_string(desp='grp')
plpy.execute(
"DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
""".format(**locals()))
vertices_per_group = unique_string(desp='nvert_grp')
init_pr = unique_string(desp='init')
random_prob = unique_string(desp='rand')
subq = unique_string(desp='subquery')
rand_damp = 1 - damping_factor
grouping_where_clause = _check_groups(
distinct_grp_table, subq, grouping_cols_list)
group_by_clause = get_table_qualified_col_str(
distinct_grp_table, grouping_cols_list)
# Find number of vertices in each group, this is the normalizing
# factor for computing the random_prob
where_clause_ppr = ''
if personalization_vertices:
where_clause_ppr = """
where __vertices__ = ANY(ARRAY{personalization_vertices})
""".format(**locals())
random_prob_grp = 1.0 - damping_factor
init_prob_grp = 1.0 / total_ppr_nodes
else:
random_prob_grp = """
{rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION
""".format(**locals())
init_prob_grp = """
1/COUNT(__vertices__)::DOUBLE PRECISION
""".format(**locals())
plpy.execute(
"DROP TABLE IF EXISTS {0}".format(vertices_per_group))
plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
SELECT {distinct_grp_table}.*,
{init_prob_grp} AS {init_pr},
{random_prob_grp} as {random_prob}
FROM {distinct_grp_table} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_temp_table}
UNION
SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
){subq}
ON {grouping_where_clause}
{where_clause_ppr}
GROUP BY {group_by_clause}
""".format(**locals()))
grouping_where_clause = _check_groups(
vertices_per_group, subq, grouping_cols_list)
group_by_clause = get_table_qualified_col_str(
vertices_per_group, grouping_cols_list)
if personalization_vertices:
init_prob_grp_ppr = 1.0 / total_ppr_nodes
init_pr = """
CASE when __vertices__ = ANY(ARRAY{personalization_vertices})
THEN {init_prob_grp_ppr} ELSE 0 END
""".format(**locals())
plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {group_by_clause}, {subq}.__vertices__
AS {vertex_id}, {init_pr} AS pagerank
FROM {vertices_per_group} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_temp_table}
UNION
SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
){subq}
ON {grouping_where_clause}
{cur_distribution}
""".format(**locals()))
vpg = unique_string(desp='vpg')
# Compute the out-degree of every node in the group-based
# subgraphs.
plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
SELECT {grouping_cols_select} {src} AS {vertex_id},
COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_temp_table}
GROUP BY {grouping_cols_select} {src}
{cnts_distribution}
""".format(grouping_cols_select=grouping_cols + ','
if grouping_cols else '', **locals()))
message_grp = _check_groups(cur, message, grouping_cols_list)
cur_join_clause = cur_join_clause + ' AND ' + _check_groups(
cur, edge_temp_table, grouping_cols_list)
out_cnts_join_clause = out_cnts_join_clause + ' AND ' \
+ _check_groups(out_cnts, edge_temp_table,
grouping_cols_list)
v1_join_clause = v1_join_clause + ' AND ' + _check_groups(
v1, edge_temp_table, grouping_cols_list)
vpg_join_clause = _check_groups(
vpg, edge_temp_table, grouping_cols_list)
vpg_t1_join_clause = _check_groups(
vpg, '__t1__', grouping_cols_list)
# join clause specific to populating random_prob for nodes without any
# incoming edges.
edge_grouping_cols_select = get_table_qualified_col_str(
edge_temp_table, grouping_cols_list)
cur_grouping_cols_select = get_table_qualified_col_str(
cur, grouping_cols_list)
# Create output summary table:
cols_names_types = get_cols_and_types(edge_table)
grouping_cols_clause = ', '.join([c_name + " " + c_type
for (c_name, c_type)
in cols_names_types
if c_name in grouping_cols_list])
plpy.execute("""
CREATE TABLE {summary_table} (
{grouping_cols_clause},
__iterations__ INTEGER
)
""".format(**locals()))
# Create output table. This will be updated whenever a group converges
# Note that vertex_id is assumed to be an integer (as described in
# documentation)
plpy.execute("""
CREATE TABLE {out_table} (
{grouping_cols_clause},
{vertex_id} INTEGER,
pagerank DOUBLE PRECISION
)
""".format(**locals()))
temp_summary_table = unique_string(desp='temp_summary')
plpy.execute(
"DROP TABLE IF EXISTS {0}".format(temp_summary_table))
plpy.execute("""
CREATE TABLE {temp_summary_table} (
{grouping_cols_clause}
)
""".format(**locals()))
################################################################
# Strings required for the main PageRank computation query
grouping_cols_select_pr = edge_grouping_cols_select + ', '
if personalization_vertices:
random_jump_prob = random_jump_prob_ppr
else:
random_jump_prob = 'MIN({vpg}.{random_prob})'.format(
**locals())
vertices_per_group_inner_join_pr = """
INNER JOIN {vertices_per_group}
AS {vpg} ON {vpg_join_clause}""".format(**locals())
ignore_group_clause_pr = ' WHERE ' + \
get_filtered_cols_subquery_str(edge_temp_table,
summary_table,
grouping_cols_list)
ignore_group_clause_ins_noincoming = ' WHERE ' + \
get_filtered_cols_subquery_str(nodes_with_no_incoming_edges,
summary_table,
grouping_cols_list)
# Strings required for updating PageRank scores of vertices that have
# no incoming edges
grouping_cols_select_ins = cur_grouping_cols_select + ','
vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
**locals())
vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
**locals())
message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
ignore_group_clause_ins = ' AND ' + get_filtered_cols_subquery_str(
cur, summary_table, grouping_cols_list)
# Strings required for convergence test query
grouping_cols_select_conv = cur_grouping_cols_select
group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
cur_grouping_cols_select)
message_grp_clause_conv = '{0} AND '.format(message_grp)
ignore_group_clause_conv = ' AND ' + get_filtered_cols_subquery_str(
cur, summary_table, grouping_cols_list)
limit = ''
# Find all nodes, in each group, that have no incoming edges. The PageRank
# value of these nodes are not updated using the first query in the
# following for loop. They must be explicitly plugged back in to the
# message table, with their corresponding group's random_prob as their
# PageRank values.
plpy.execute("""
CREATE TABLE {nodes_with_no_incoming_edges} AS
SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
{vpg}.{random_prob} AS pagerank
FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
WHERE NOT EXISTS (
SELECT 1
FROM {edge_temp_table} AS __t2__
WHERE __t1__.{src}=__t2__.{dest}
AND {where_group_clause}
) {vpg_where_clause_ins}
GROUP BY {select_group_cols}, {vertex_id}, pagerank
""".format(
select_group_cols=get_table_qualified_col_str(
'__t1__', grouping_cols_list),
where_group_clause=_check_groups(
'__t1__', '__t2__', grouping_cols_list),
**locals()))
else:
# cur and out_cnts tables can be simpler when no grouping is
# involved.
if total_ppr_nodes > 0:
init_value = ppr_init_value_clause
else:
init_value = 1.0 / n_vertices
plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
FROM {vertex_table}
{cur_distribution}
""".format(**locals()))
# Compute the out-degree of every node in the graph.
plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_temp_table}
GROUP BY {src}
{cnts_distribution}
""".format(**locals()))
# The summary table when there is no grouping will contain only
# the iteration column. We don't need to create the out_table
# when no grouping is used since the 'cur' table will be renamed
# to out_table after pagerank computation is completed.
plpy.execute("""
CREATE TABLE {summary_table} (
__iterations__ INTEGER
)
""".format(**locals()))
# Find all nodes in the graph that don't have any incoming edges and
# assign random_prob as their pagerank values.
plpy.execute("""
CREATE TABLE {nodes_with_no_incoming_edges} AS
SELECT DISTINCT({src}), {random_probability} AS pagerank
FROM {edge_temp_table}
EXCEPT
(SELECT DISTINCT({dest}), {random_probability} AS pagerank
FROM {edge_temp_table})
""".format(**locals()))
unconverged = 0
iteration_num = 0
for iteration_num in range(max_iter):
################################################################
# PageRank for node 'A' at any given iteration 'i' is given by:
# PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
# PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
# where 'N' is the number of vertices in the graph,
# B, C are nodes that have edges to node A, and
# degree(node) represents the number of outgoing edges from 'node'
################################################################
# Essentially, the pagerank for a node is based on an aggregate of a
# fraction of the pagerank values of all the nodes that have incoming
# edges to it, along with a small random probability.
# More information can be found at:
# https://en.wikipedia.org/wiki/PageRank#Damping_factor
# The query below computes the PageRank of each node using the above
# formula. A small explanatory note on ignore_group_clause:
# This is used only when grouping is set. This essentially will have
# the condition that will help skip the PageRank computation on groups
# that have converged.
plpy.execute("""
CREATE TABLE {message} AS
SELECT {grouping_cols_select_pr}
{edge_temp_table}.{dest} AS {vertex_id},
SUM(({v1}.pagerank)/{out_cnts}.{out_cnts_cnt})*{damping_factor}+
{random_jump_prob} AS pagerank
FROM {edge_temp_table}
INNER JOIN {cur} ON {cur_join_clause}
INNER JOIN {out_cnts} ON {out_cnts_join_clause}
INNER JOIN {cur} AS {v1} ON {v1_join_clause}
{vertices_per_group_inner_join_pr}
{ignore_group_clause}
GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
{cur_distribution}
""".format(ignore_group_clause=ignore_group_clause_pr
if iteration_num > 0 else ignore_group_clause_first,
**locals()))
# If there are nodes that have no incoming edges, they are not
# captured in the message table. Insert entries for such nodes,
# with random_prob.
plpy.execute("""
INSERT INTO {message}
SELECT *
FROM {nodes_with_no_incoming_edges}
{ignore_group_clause}
""".format(ignore_group_clause=ignore_group_clause_ins_noincoming
if iteration_num > 0 else ignore_group_clause_first,
**locals()))
# Check for convergence:
# Check for convergence only if threshold != 0.
if threshold != 0:
# message_unconv and cur_unconv will contain the unconverged groups
# after current # and previous iterations respectively. Groups that
# are missing in message_unconv but appear in cur_unconv are the
# groups that have converged after this iteration's computations.
# If no grouping columns are specified, then we check if there is
# at least one unconverged node (limit 1 is used in the
# query).
if iteration_num == 0 and grouping_cols:
# Hack to address corner case:
# With grouping, if there was a graph that converged in
# the very first iteration (a complete graph is an eg.
# of such a graph), then the pagerank scores for that
# group was not showing up in the output. The following
# code just prevents convergence in the first iteration.
plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT * FROM {distinct_grp_table}
""".format(**locals()))
else:
plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT {grouping_cols_select_conv}
FROM {message}
INNER JOIN {cur}
ON {cur}.{vertex_id}={message}.{vertex_id}
WHERE {message_grp_clause_conv}
ABS({cur}.pagerank-{message}.pagerank) > {threshold}
{ignore_group_clause}
{group_by_grouping_cols_conv}
{limit}
""".format(ignore_group_clause=ignore_group_clause_ins
if iteration_num > 0 else ignore_group_clause_conv,
**locals()))
unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
""".format(message_unconv))[0]["cnt"]
if iteration_num > 0 and grouping_cols:
# Update result and summary tables for groups that have
# converged
# since the last iteration.
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table,
message,
grouping_cols_list,
cur_unconv,
message_unconv)
plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
{cur_unconv} """.format(**locals()))
else:
# Do not run convergence test if threshold=0, since that implies
# the user wants to run max_iter iterations.
unconverged = 1
plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
""".format(**locals()))
if unconverged == 0:
break
# If there still are some unconverged groups/(entire table),
# update results.
if grouping_cols:
if unconverged > 0:
if threshold != 0:
# We completed max_iters, but there are still some unconverged
# groups # Update the result and summary tables for unconverged
# groups.
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table,
cur,
grouping_cols_list,
cur_unconv)
else:
# No group has converged. List of all group values are in
# distinct_grp_table.
update_output_grouping_tables_for_link_analysis(
temp_summary_table,
iteration_num,
summary_table,
out_table, cur,
grouping_cols_list,
distinct_grp_table)
# updating the calculated pagerank value in case of
# Personalized Page Rank.
# Ref :
# https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
if total_ppr_nodes > 1:
plpy.execute("""UPDATE {out_table} set pagerank =
pagerank / {total_ppr_nodes}::DOUBLE PRECISION
""".format(out_table=out_table,
total_ppr_nodes=total_ppr_nodes))
else:
# updating the calculated pagerank value in case of
# Personalized Page Rank.
# Ref :
# https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
if total_ppr_nodes > 1:
plpy.execute("""UPDATE {table_name} set pagerank =
pagerank / {total_ppr_nodes}::DOUBLE PRECISION
""".format(table_name=cur,
total_ppr_nodes=total_ppr_nodes))
plpy.execute("""
ALTER TABLE {table_name}
RENAME TO {out_table}
""".format(table_name=cur, **locals()))
plpy.execute("""
INSERT INTO {summary_table} VALUES
({iteration_num}+1)
""".format(**locals()))
# Step 4: Cleanup
plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6}
""".format(out_cnts, edge_temp_table, cur, message, cur_unconv,
message_unconv, nodes_with_no_incoming_edges))
if grouping_cols:
plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}
""".format(vertices_per_group, temp_summary_table,
distinct_grp_table))
def get_query_params_for_ppr(personalization_vertices, damping_factor,
vertex_id, edge_temp_table, vertex_table, edge_params):
"""
This function will prepare the Join Clause and the condition to Calculate
the Personalized Page Rank and Returns Total number of user provided
personalization vertices, A join Clause and a clause to be added to existing
formula to calculate pagerank.
Args:
@param personalization_vertices
@param damping_factor
@param vertex_id
@param edge_temp_table
@param vertex_table
Returns :
(Integer, String, String)
"""
total_ppr_nodes = 0
ppr_random_prob_clause = ''
ppr_init_prob_clause = ''
if personalization_vertices:
total_ppr_nodes = len(personalization_vertices)
ppr_init_value = 1.0 / total_ppr_nodes
prob_value = 1.0 - damping_factor
dest = edge_params["dest"]
# In case of PPR, Assign the Random jump probability to the personalization_vertices only.
# For rest of the nodes, Random jump probability will be zero.
ppr_random_prob_clause = """
CASE WHEN {edge_temp_table}.{dest} = ANY(ARRAY{personalization_vertices})
THEN {prob_value} ELSE 0 END
""".format(**locals())
ppr_init_prob_clause = """
CASE WHEN {vertex_id} = ANY(ARRAY{personalization_vertices})
THEN {ppr_init_value} ELSE 0 END
""".format(**locals())
return(total_ppr_nodes, ppr_random_prob_clause, ppr_init_prob_clause)
def pagerank_help(schema_madlib, message, **kwargs):
"""
Help function for pagerank
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(schema_madlib, 'PageRank',
"""out_table TEXT, -- Name of the output table for PageRank
damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
-- (DEFAULT = 0.85)
max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100)
threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*1000),
-- N is number of vertices in the graph)
grouping_col TEXT, -- Comma separated column names to group on
-- (DEFAULT = NULL, no grouping)
personalization_vertices ARRAY OF INTEGER, -- A comma seperated list of vertices
or nodes for personalized page rank.
""") + """
A summary table is also created that contains information regarding the
number of iterations required for convergence. It is named by adding the
suffix '_summary' to the 'out_table' parameter.
"""
else:
if message is not None and \
message.lower() in ("example", "examples"):
help_string = """
----------------------------------------------------------------------------
EXAMPLES
----------------------------------------------------------------------------
-- Create a graph, represented as vertex and edge tables.
DROP TABLE IF EXISTS vertex, edge;
CREATE TABLE vertex(
id INTEGER
);
CREATE TABLE edge(
src INTEGER,
dest INTEGER,
user_id INTEGER
);
INSERT INTO vertex VALUES
(0),
(1),
(2),
(3),
(4),
(5),
(6);
INSERT INTO edge VALUES
(0, 1, 1),
(0, 2, 1),
(0, 4, 1),
(1, 2, 1),
(1, 3, 1),
(2, 3, 1),
(2, 5, 1),
(2, 6, 1),
(3, 0, 1),
(4, 0, 1),
(5, 6, 1),
(6, 3, 1),
(0, 1, 2),
(0, 2, 2),
(0, 4, 2),
(1, 2, 2),
(1, 3, 2),
(2, 3, 2),
(3, 0, 2),
(4, 0, 2),
(5, 6, 2),
(6, 3, 2);
-- Compute the PageRank:
DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary;
SELECT madlib.pagerank(
'vertex', -- Vertex table
'id', -- Vertix id column
'edge', -- Edge table
'src=src, dest=dest', -- Comma delimted string of edge arguments
'pagerank_out'); -- Output table of PageRank
-- View the PageRank of all vertices, sorted by their scores.
SELECT * FROM pagerank_out ORDER BY pagerank DESC;
-- View the summary table to find the number of iterations required for
-- convergence.
SELECT * FROM pagerank_out_summary;
-- Compute PageRank of nodes associated with each user:
DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary;
SELECT madlib.pagerank(
'vertex', -- Vertex table
'id', -- Vertix id column
'edge', -- Edge table
'src=src, dest=dest', -- Comma delimted string of edge arguments
'pagerank_out', -- Output table of PageRank
NULL, -- Default damping factor
NULL, -- Default max_iter
0.00000001, -- Threshold
'user_id'); -- Grouping column
-- View the PageRank of all vertices, sorted by their scores.
SELECT * FROM pagerank_out ORDER BY user_id, pagerank DESC;
-- View the summary table to find the number of iterations required for
-- convergence for each group.
SELECT * FROM pagerank_out_summary;
-- Compute the Personalized PageRank:
DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary;
SELECT madlib.pagerank(
'vertex', -- Vertex table
'id', -- Vertix id column
'edge', -- Edge table
'src=src, dest=dest', -- Comma delimted string of edge arguments
'pagerank_out', -- Output table of PageRank
NULL, -- Default damping factor (0.85)
NULL, -- Default max iters (100)
NULL, -- Default Threshold
NULL, -- No Grouping
ARRAY[2,4]); -- Personlized Nodes
-- View the Personalized PageRank of all vertices, sorted by their scores.
SELECT * FROM pagerank_out ORDER BY pagerank DESC;
-- View the summary table to find the number of iterations required for
-- convergence.
SELECT * FROM pagerank_out_summary;
"""
else:
help_string = """
----------------------------------------------------------------------------
SUMMARY
----------------------------------------------------------------------------
Given a directed graph, pagerank algorithm finds the PageRank score of all
the vertices in the graph.
--
For an overview on usage, run:
SELECT {schema_madlib}.pagerank('usage');
For some examples, run:
SELECT {schema_madlib}.pagerank('example')
--
"""
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------