blob: 202e536ec084ca517a789501700dd048d6c42fe5 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# PageRank
# Please refer to the pagerank.sql_in file for the documentation
"""
@file pagerank.py_in
@namespace graph
"""
import plpy
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
from utilities.validate_args import columns_exist_in_table, get_cols_and_types
from graph_utils import *
m4_changequote(`<!', `!>')
def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, damping_factor, max_iter, threshold,
grouping_cols_list, module_name):
"""
Function to validate input parameters for PageRank
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
out_table, module_name)
_assert(damping_factor >= 0.0 and damping_factor <= 1.0,
"""PageRank: Invalid damping factor value ({0}), must be between 0 and 1.""".
format(damping_factor))
_assert(not threshold or (threshold >= 0.0 and threshold <= 1.0),
"""PageRank: Invalid threshold value ({0}), must be between 0 and 1.""".
format(threshold))
_assert(max_iter > 0,
"""PageRank: Invalid max_iter value ({0}), must be a positive integer.""".
format(max_iter))
if grouping_cols_list:
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the edge_table, and not expressions!
_assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
"PageRank error: One or more grouping columns specified do not exist!")
def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs):
"""
Function that computes the PageRank
Args:
@param vertex_table
@param vertex_id
@param edge_table
@param source_vertex
@param dest_vertex
@param out_table
@param damping_factor
@param max_iter
@param threshold
"""
old_msg_level = plpy.execute("""
SELECT setting
FROM pg_settings
WHERE name='client_min_messages'
""")[0]['setting']
plpy.execute('SET client_min_messages TO warning')
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
edge_params = extract_keyvalue_params(edge_args, params_types, default_args)
# populate default values for optional params if null
if damping_factor is None:
damping_factor = 0.85
if max_iter is None:
max_iter = 100
if vertex_id is None:
vertex_id = "id"
if not grouping_cols:
grouping_cols = ''
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, damping_factor, max_iter, threshold,
grouping_cols_list, 'PageRank')
summary_table = out_table + "_summary"
_assert(not table_exists(summary_table),
"Graph PageRank: Output summary table ({summary_table}) already exists."
.format(**locals()))
src = edge_params["src"]
dest = edge_params["dest"]
nvertices = plpy.execute("""
SELECT COUNT({0}) AS cnt
FROM {1}
""".format(vertex_id, vertex_table))[0]["cnt"]
# A fixed threshold value, of say 1e-5, might not work well when the
# number of vertices is a billion, since the initial pagerank value
# of all nodes would then be 1/1e-9. So, assign default threshold
# value based on number of nodes in the graph.
# NOTE: The heuristic below is not based on any scientific evidence.
if threshold is None:
threshold = 1.0/(nvertices*100)
# table/column names used when grouping_cols is set.
distinct_grp_table = ''
vertices_per_group = ''
vpg = ''
grouping_where_clause = ''
group_by_clause = ''
random_prob = ''
edge_temp_table = unique_string(desp='temp_edge')
distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
<!"DISTRIBUTED BY ({0})".format(dest)!>)
plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS
SELECT * FROM {edge_table}
{distribution}
""".format(**locals()))
# GPDB and HAWQ have distributed by clauses to help them with indexing.
# For Postgres we add the index explicitly.
sql_index = m4_ifdef(<!__POSTGRESQL__!>,
<!"""CREATE INDEX ON {edge_temp_table} ({src});
""".format(**locals())!>,
<!''!>)
plpy.execute(sql_index)
# Intermediate tables required.
cur = unique_string(desp='cur')
message = unique_string(desp='message')
cur_unconv = unique_string(desp='cur_unconv')
message_unconv = unique_string(desp='message_unconv')
out_cnts = unique_string(desp='out_cnts')
out_cnts_cnt = unique_string(desp='cnt')
v1 = unique_string(desp='v1')
cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
<!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id}
""".format(**locals())
out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src}
""".format(**locals())
v1_join_clause = """{v1}.{vertex_id}={edge_temp_table}.{src}
""".format(**locals())
random_probability = (1.0-damping_factor)/nvertices
######################################################################
# Create several strings that will be used to construct required
# queries. These strings will be required only during grouping.
random_jump_prob = random_probability
ignore_group_clause_first = ''
limit = ' LIMIT 1 '
grouping_cols_select_pr = ''
vertices_per_group_inner_join_pr = ''
ignore_group_clause_pr= ''
grouping_cols_select_ins = ''
vpg_from_clause_ins = ''
vpg_where_clause_ins = ''
message_grp_where_ins = ''
ignore_group_clause_ins = ''
grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
group_by_grouping_cols_conv = ''
message_grp_clause_conv = ''
ignore_group_clause_conv = ''
######################################################################
# Queries when groups are involved need a lot more conditions in
# various clauses, so populating the required variables. Some intermediate
# tables are unnecessary when no grouping is involved, so create some
# tables and certain columns only during grouping.
if grouping_cols:
distinct_grp_table = unique_string(desp='grp')
plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_table}
""".format(**locals()))
vertices_per_group = unique_string(desp='nvert_grp')
init_pr = unique_string(desp='init')
random_prob = unique_string(desp='rand')
subq = unique_string(desp='subquery')
rand_damp = 1-damping_factor
grouping_where_clause = ' AND '.join(
[distinct_grp_table+'.'+col+'='+subq+'.'+col
for col in grouping_cols_list])
group_by_clause = ', '.join([distinct_grp_table+'.'+col
for col in grouping_cols_list])
# Find number of vertices in each group, this is the normalizing factor
# for computing the random_prob
plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
SELECT {distinct_grp_table}.*,
1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
{rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob}
FROM {distinct_grp_table} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_table}
UNION
SELECT {grouping_cols}, {dest} FROM {edge_table}
){subq}
ON {grouping_where_clause}
GROUP BY {group_by_clause}
""".format(**locals()))
grouping_where_clause = ' AND '.join(
[vertices_per_group+'.'+col+'='+subq+'.'+col
for col in grouping_cols_list])
group_by_clause = ', '.join([vertices_per_group+'.'+col
for col in grouping_cols_list])
plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
{init_pr} AS pagerank
FROM {vertices_per_group} INNER JOIN (
SELECT {grouping_cols}, {src} AS __vertices__
FROM {edge_table}
UNION
SELECT {grouping_cols}, {dest} FROM {edge_table}
){subq}
ON {grouping_where_clause}
""".format(**locals()))
vpg = unique_string(desp='vpg')
# Compute the out-degree of every node in the group-based subgraphs.
plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
SELECT {grouping_cols_select} {src} AS {vertex_id},
COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_table}
GROUP BY {grouping_cols_select} {src}
{cnts_distribution}
""".format(grouping_cols_select=grouping_cols+','
if grouping_cols else '', **locals()))
message_grp = ' AND '.join(
["{cur}.{col}={message}.{col}".format(**locals())
for col in grouping_cols_list])
cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
for col in grouping_cols_list])
out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
for col in grouping_cols_list])
v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
for col in grouping_cols_list])
vpg_join_clause = ' AND '.join(
["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
for col in grouping_cols_list])
vpg_cur_join_clause = ' AND '.join(
["{cur}.{col}={vpg}.{col}".format(**locals())
for col in grouping_cols_list])
# join clause specific to populating random_prob for nodes without any
# incoming edges.
edge_grouping_cols_select = ', '.join(
["{edge_temp_table}.{col}".format(**locals())
for col in grouping_cols_list])
cur_grouping_cols_select = ', '.join(
["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
# Create output summary table:
cols_names_types = get_cols_and_types(edge_table)
grouping_cols_clause = ', '.join([c_name+" "+c_type
for (c_name, c_type) in cols_names_types
if c_name in grouping_cols_list])
plpy.execute("""
CREATE TABLE {summary_table} (
{grouping_cols_clause},
__iterations__ INTEGER
)
""".format(**locals()))
# Create output table. This will be updated whenever a group converges
# Note that vertex_id is assumed to be an integer (as described in
# documentation)
plpy.execute("""
CREATE TABLE {out_table} (
{grouping_cols_clause},
{vertex_id} INTEGER,
pagerank DOUBLE PRECISION
)
""".format(**locals()))
temp_summary_table = unique_string(desp='temp_summary')
plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
plpy.execute("""
CREATE TABLE {temp_summary_table} (
{grouping_cols_clause}
)
""".format(**locals()))
######################################################################
# Strings required for the main PageRank computation query
grouping_cols_select_pr = edge_grouping_cols_select+', '
random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
AS {vpg} ON {vpg_join_clause}""".format(**locals())
ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table,
edge_temp_table, grouping_cols_list)
# Strings required for updating PageRank scores of vertices that have
# no incoming edges
grouping_cols_select_ins = cur_grouping_cols_select+','
vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
**locals())
vpg_where_clause_ins = '{vpg_cur_join_clause} AND '.format(
**locals())
message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table,
cur, grouping_cols_list)
# Strings required for convergence test query
grouping_cols_select_conv = cur_grouping_cols_select
group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
cur_grouping_cols_select)
message_grp_clause_conv = '{0} AND '.format(message_grp)
ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table,
cur, grouping_cols_list)
limit = ''
else:
# cur and out_cnts tables can be simpler when no grouping is involved.
init_value = 1.0/nvertices
plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
FROM {vertex_table}
""".format(**locals()))
# Compute the out-degree of every node in the graph.
plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
FROM {edge_table}
GROUP BY {src}
{cnts_distribution}
""".format(**locals()))
# The summary table when there is no grouping will contain only
# the iteration column. We don't need to create the out_table
# when no grouping is used since the 'cur' table will be renamed
# to out_table after pagerank computation is completed.
plpy.execute("""
CREATE TABLE {summary_table} (
__iterations__ INTEGER
)
""".format(**locals()))
unconverged = 0
iteration_num = 0
for iteration_num in range(max_iter):
#####################################################################
# PageRank for node 'A' at any given iteration 'i' is given by:
# PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
# PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
# where 'N' is the number of vertices in the graph,
# B, C are nodes that have edges to node A, and
# degree(node) represents the number of outgoing edges from 'node'
#####################################################################
# Essentially, the pagerank for a node is based on an aggregate of a
# fraction of the pagerank values of all the nodes that have incoming
# edges to it, along with a small random probability.
# More information can be found at:
# https://en.wikipedia.org/wiki/PageRank#Damping_factor
# The query below computes the PageRank of each node using the above
# formula. A small explanatory note on ignore_group_clause:
# This is used only when grouping is set. This essentially will have
# the condition that will help skip the PageRank computation on groups
# that have converged.
plpy.execute("""
CREATE TABLE {message} AS
SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id},
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob} AS pagerank
FROM {edge_temp_table}
INNER JOIN {cur} ON {cur_join_clause}
INNER JOIN {out_cnts} ON {out_cnts_join_clause}
INNER JOIN {cur} AS {v1} ON {v1_join_clause}
{vertices_per_group_inner_join_pr}
{ignore_group_clause}
GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
""".format(ignore_group_clause=ignore_group_clause_pr
if iteration_num>0 else ignore_group_clause_first,
**locals()))
# If there are nodes that have no incoming edges, they are not
# captured in the message table. Insert entries for such nodes,
# with random_prob.
plpy.execute("""
INSERT INTO {message}
SELECT {grouping_cols_select_ins} {cur}.{vertex_id},
{random_jump_prob} AS pagerank
FROM {cur} {vpg_from_clause_ins}
WHERE {vpg_where_clause_ins} {vertex_id} NOT IN (
SELECT {vertex_id}
FROM {message}
{message_grp_where_ins}
)
{ignore_group_clause}
GROUP BY {grouping_cols_select_ins} {cur}.{vertex_id}
""".format(ignore_group_clause=ignore_group_clause_ins
if iteration_num>0 else ignore_group_clause_first,
**locals()))
# Check for convergence:
## Check for convergence only if threshold != 0.
if threshold != 0:
# message_unconv and cur_unconv will contain the unconverged groups
# after current # and previous iterations respectively. Groups that
# are missing in message_unconv but appear in cur_unconv are the
# groups that have converged after this iteration's computations.
# If no grouping columns are specified, then we check if there is
# at least one unconverged node (limit 1 is used in the query).
plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT {grouping_cols_select_conv}
FROM {message}
INNER JOIN {cur}
ON {cur}.{vertex_id}={message}.{vertex_id}
WHERE {message_grp_clause_conv}
ABS({cur}.pagerank-{message}.pagerank) > {threshold}
{ignore_group_clause}
{group_by_grouping_cols_conv}
{limit}
""".format(ignore_group_clause=ignore_group_clause_ins
if iteration_num>0 else ignore_group_clause_conv,
**locals()))
unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
""".format(message_unconv))[0]["cnt"]
if iteration_num > 0 and grouping_cols:
# Update result and summary tables for groups that have
# converged
# since the last iteration.
update_result_tables(temp_summary_table, iteration_num,
summary_table, out_table, message, grouping_cols_list,
cur_unconv, message_unconv)
plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
{cur_unconv} """.format(**locals()))
else:
# Do not run convergence test if threshold=0, since that implies
# the user wants to run max_iter iterations.
unconverged = 1
plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
""".format(**locals()))
if unconverged == 0:
break
# If there still are some unconverged groups/(entire table),
# update results.
if grouping_cols:
if unconverged > 0:
if threshold != 0:
# We completed max_iters, but there are still some unconverged
# groups # Update the result and summary tables for unconverged
# groups.
update_result_tables(temp_summary_table, iteration_num,
summary_table, out_table, cur, grouping_cols_list,
cur_unconv)
else:
# No group has converged. List of all group values are in
# distinct_grp_table.
update_result_tables(temp_summary_table, iteration_num,
summary_table, out_table, cur, grouping_cols_list,
distinct_grp_table)
else:
plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
""".format(table_name=cur, **locals()))
plpy.execute("""
INSERT INTO {summary_table} VALUES
({iteration_num}+1);
""".format(**locals()))
## Step 4: Cleanup
plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5};
""".format(out_cnts, edge_temp_table, cur, message, cur_unconv,
message_unconv))
if grouping_cols:
plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
""".format(vertices_per_group, temp_summary_table,
distinct_grp_table))
plpy.execute("SET client_min_messages TO %s" % old_msg_level)
def update_result_tables(temp_summary_table, i, summary_table, out_table,
res_table, grouping_cols_list, cur_unconv, message_unconv=None):
"""
This function updates the summary and output tables only for those
groups that have converged. This is found out by looking at groups
that appear in cur_unvonv but not in message_unconv: message_unconv
consists of groups that have not converged in the current iteration,
while cur_unconv contains groups that had not converged in the
previous iterations. The entries in cur_unconv is a superset of the
entries in message_unconv. So the difference in the groups across
the two tables represents the groups that converged in the current
iteration.
"""
plpy.execute("TRUNCATE TABLE {0}".format(temp_summary_table))
if message_unconv is None:
# If this function is called after max_iter is completed, without
# convergence, all the unconverged groups from cur_unconv is used
# (note that message_unconv is renamed to cur_unconv before checking
# for unconverged==0 in the pagerank function's for loop)
plpy.execute("""
INSERT INTO {temp_summary_table}
SELECT * FROM {cur_unconv}
""".format(**locals()))
else:
plpy.execute("""
INSERT INTO {temp_summary_table}
SELECT {cur_unconv}.*
FROM {cur_unconv}
WHERE {join_condition}
""".format(join_condition=get_ignore_groups(
message_unconv, cur_unconv, grouping_cols_list), **locals()))
plpy.execute("""
INSERT INTO {summary_table}
SELECT *, {i}+1 AS __iteration__
FROM {temp_summary_table}
""".format(**locals()))
plpy.execute("""
INSERT INTO {out_table}
SELECT {res_table}.*
FROM {res_table}
INNER JOIN {temp_summary_table}
ON {join_condition}
""".format(join_condition=' AND '.join(
["{res_table}.{col}={temp_summary_table}.{col}".format(
**locals())
for col in grouping_cols_list]), **locals()))
def get_ignore_groups(first_table, second_table, grouping_cols_list):
"""
This function generates the necessary clause to only select the
groups that appear in second_table and not in first_table.
"""
return """({second_table_cols}) NOT IN (SELECT {grouping_cols} FROM
{first_table}) """.format(second_table_cols=', '.join(
["{second_table}.{col}".format(**locals())
for col in grouping_cols_list]),
grouping_cols=', '.join([col for col in grouping_cols_list]),
**locals())
def pagerank_help(schema_madlib, message, **kwargs):
"""
Help function for pagerank
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(schema_madlib, 'PageRank',
"""out_table TEXT, -- Name of the output table for PageRank
damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
-- (DEFAULT = 0.85)
max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100)
threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*100),
-- N is number of vertices in the graph)
grouping_col TEXT -- Comma separated column names to group on
-- (DEFAULT = NULL, no grouping)
""") + """
A summary table is also created that contains information regarding the
number of iterations required for convergence. It is named by adding the
suffix '_summary' to the 'out_table' parameter.
"""
else:
if message is not None and \
message.lower() in ("example", "examples"):
help_string = """
----------------------------------------------------------------------------
EXAMPLES
----------------------------------------------------------------------------
-- Create a graph, represented as vertex and edge tables.
DROP TABLE IF EXISTS vertex, edge;
CREATE TABLE vertex(
id INTEGER
);
CREATE TABLE edge(
src INTEGER,
dest INTEGER,
user_id INTEGER
);
INSERT INTO vertex VALUES
(0),
(1),
(2),
(3),
(4),
(5),
(6);
INSERT INTO edge VALUES
(0, 1, 1),
(0, 2, 1),
(0, 4, 1),
(1, 2, 1),
(1, 3, 1),
(2, 3, 1),
(2, 5, 1),
(2, 6, 1),
(3, 0, 1),
(4, 0, 1),
(5, 6, 1),
(6, 3, 1),
(0, 1, 2),
(0, 2, 2),
(0, 4, 2),
(1, 2, 2),
(1, 3, 2),
(2, 3, 2),
(3, 0, 2),
(4, 0, 2),
(5, 6, 2),
(6, 3, 2);
-- Compute the PageRank:
DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary;
SELECT madlib.pagerank(
'vertex', -- Vertex table
'id', -- Vertix id column
'edge', -- Edge table
'src=src, dest=dest', -- Comma delimted string of edge arguments
'pagerank_out'); -- Output table of PageRank
-- View the PageRank of all vertices, sorted by their scores.
SELECT * FROM pagerank_out ORDER BY pagerank DESC;
-- View the summary table to find the number of iterations required for
-- convergence.
SELECT * FROM pagerank_out_summary;
-- Compute PageRank of nodes associated with each user:
DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary;
SELECT madlib.pagerank(
'vertex', -- Vertex table
'id', -- Vertix id column
'edge', -- Edge table
'src=src, dest=dest', -- Comma delimted string of edge arguments
'pagerank_out', -- Output table of PageRank
NULL, -- Default damping factor
NULL, -- Default max_iter
0.00000001, -- Threshold
'user_id'); -- Grouping column
-- View the PageRank of all vertices, sorted by their scores.
SELECT * FROM pagerank_out ORDER BY user_id, pagerank DESC;
-- View the summary table to find the number of iterations required for
-- convergence for each group.
SELECT * FROM pagerank_out_summary;
"""
else:
help_string = """
----------------------------------------------------------------------------
SUMMARY
----------------------------------------------------------------------------
Given a directed graph, pagerank algorithm finds the PageRank score of all
the vertices in the graph.
--
For an overview on usage, run:
SELECT {schema_madlib}.pagerank('usage');
For some examples, run:
SELECT {schema_madlib}.pagerank('example')
--
"""
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------