Pagerank: Remove duplicate entries from grouping output
JIRA: MADLIB-1229
JIRA: MADLIB-1253
This commit fixes the missing output for complete graphs bug as well.
Closes #294
Co-authored-by: Orhan Kislal <okislal@pivotal.io>
diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in b/src/ports/postgres/modules/graph/graph_utils.py_in
index 8a41560..b0eaee4 100644
--- a/src/ports/postgres/modules/graph/graph_utils.py_in
+++ b/src/ports/postgres/modules/graph/graph_utils.py_in
@@ -128,11 +128,13 @@
format(func_name))
def update_output_grouping_tables_for_link_analysis(temp_summary_table,
- iter_num, summary_table,
- out_table, res_table,
- grouping_cols_list,
- cur_unconv,
- message_unconv=None):
+ iter_num,
+ summary_table,
+ out_table,
+ res_table,
+ grouping_cols_list,
+ cur_unconv,
+ message_unconv=None):
"""
This function updates the summary and output tables only for those
groups that have converged. This is found out by looking at groups
diff --git a/src/ports/postgres/modules/graph/pagerank.py_in b/src/ports/postgres/modules/graph/pagerank.py_in
index 31377c0..71cddd2 100644
--- a/src/ports/postgres/modules/graph/pagerank.py_in
+++ b/src/ports/postgres/modules/graph/pagerank.py_in
@@ -455,6 +455,7 @@
WHERE __t1__.{src}=__t2__.{dest}
AND {where_group_clause}
) {vpg_where_clause_ins}
+ GROUP BY {select_group_cols}, {vertex_id}, pagerank
""".format(
select_group_cols=get_table_qualified_col_str(
'__t1__', grouping_cols_list),
@@ -526,6 +527,7 @@
# This is used only when grouping is set. This essentially will have
# the condition that will help skip the PageRank computation on groups
# that have converged.
+
plpy.execute("""
CREATE TABLE {message} AS
SELECT {grouping_cols_select_pr}
@@ -543,6 +545,7 @@
""".format(ignore_group_clause=ignore_group_clause_pr
if iteration_num > 0 else ignore_group_clause_first,
**locals()))
+
# If there are nodes that have no incoming edges, they are not
# captured in the message table. Insert entries for such nodes,
# with random_prob.
@@ -564,7 +567,19 @@
# If no grouping columns are specified, then we check if there is
# at least one unconverged node (limit 1 is used in the
# query).
- plpy.execute("""
+ if iteration_num == 0 and grouping_cols:
+ # Hack to address corner case:
+ # With grouping, if there was a graph that converged in
+ # the very first iteration (a complete graph is an eg.
+ # of such a graph), then the pagerank scores for that
+ # group was not showing up in the output. The following
+ # code just prevents convergence in the first iteration.
+ plpy.execute("""
+ CREATE TEMP TABLE {message_unconv} AS
+ SELECT * FROM {distinct_grp_table}
+ """.format(**locals()))
+ else:
+ plpy.execute("""
CREATE TEMP TABLE {message_unconv} AS
SELECT {grouping_cols_select_conv}
FROM {message}
@@ -578,19 +593,23 @@
""".format(ignore_group_clause=ignore_group_clause_ins
if iteration_num > 0 else ignore_group_clause_conv,
**locals()))
+
unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
""".format(message_unconv))[0]["cnt"]
if iteration_num > 0 and grouping_cols:
# Update result and summary tables for groups that have
# converged
# since the last iteration.
- update_output_grouping_tables_for_link_analysis(temp_summary_table,
- iteration_num,
- summary_table,
- out_table, message,
- grouping_cols_list,
- cur_unconv,
- message_unconv)
+ update_output_grouping_tables_for_link_analysis(
+ temp_summary_table,
+ iteration_num,
+ summary_table,
+ out_table,
+ message,
+ grouping_cols_list,
+ cur_unconv,
+ message_unconv)
+
plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
{cur_unconv} """.format(**locals()))
@@ -613,21 +632,24 @@
# We completed max_iters, but there are still some unconverged
# groups # Update the result and summary tables for unconverged
# groups.
- update_output_grouping_tables_for_link_analysis(temp_summary_table,
- iteration_num,
- summary_table,
- out_table, cur,
- grouping_cols_list,
- cur_unconv)
+ update_output_grouping_tables_for_link_analysis(
+ temp_summary_table,
+ iteration_num,
+ summary_table,
+ out_table,
+ cur,
+ grouping_cols_list,
+ cur_unconv)
else:
# No group has converged. List of all group values are in
# distinct_grp_table.
- update_output_grouping_tables_for_link_analysis(temp_summary_table,
- iteration_num,
- summary_table,
- out_table, cur,
- grouping_cols_list,
- distinct_grp_table)
+ update_output_grouping_tables_for_link_analysis(
+ temp_summary_table,
+ iteration_num,
+ summary_table,
+ out_table, cur,
+ grouping_cols_list,
+ distinct_grp_table)
# updating the calculated pagerank value in case of
# Personalized Page Rank.
@@ -639,15 +661,16 @@
""".format(out_table=out_table,
total_ppr_nodes=total_ppr_nodes))
else:
- # updating the calculated pagerank value in case of
- # Personalized Page Rank.
- # Ref :
- # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
+ # updating the calculated pagerank value in case of
+ # Personalized Page Rank.
+ # Ref :
+ # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html
if total_ppr_nodes > 1:
plpy.execute("""UPDATE {table_name} set pagerank =
pagerank / {total_ppr_nodes}::DOUBLE PRECISION
""".format(table_name=cur,
total_ppr_nodes=total_ppr_nodes))
+
plpy.execute("""
ALTER TABLE {table_name}
RENAME TO {out_table}
diff --git a/src/ports/postgres/modules/graph/test/pagerank.sql_in b/src/ports/postgres/modules/graph/test/pagerank.sql_in
index 4b93075..14d3371 100644
--- a/src/ports/postgres/modules/graph/test/pagerank.sql_in
+++ b/src/ports/postgres/modules/graph/test/pagerank.sql_in
@@ -149,3 +149,43 @@
-- SELECT assert(relative_error(__iterations__, 31) = 0,
-- 'PageRank: Incorrect iterations for group 2.'
-- ) FROM pagerank_gr_out_summary WHERE user_id=2;
+
+-- Test to capture corner case reported in https://issues.apache.org/jira/browse/MADLIB-1229
+
+DROP TABLE IF EXISTS vertex, "EDGE";
+CREATE TABLE vertex(
+id INTEGER
+);
+CREATE TABLE "EDGE"(
+src INTEGER,
+dest INTEGER,
+user_id INTEGER
+);
+INSERT INTO vertex VALUES
+(0),
+(1),
+(2);
+INSERT INTO "EDGE" VALUES
+(0, 1, 1),
+(0, 2, 1),
+(1, 2, 1),
+(2, 1, 1),
+(0, 1, 2);
+
+
+DROP TABLE IF EXISTS pagerank_gr_out;
+DROP TABLE IF EXISTS pagerank_gr_out_summary;
+SELECT pagerank(
+'vertex', -- Vertex table
+'id', -- Vertix id column
+'"EDGE"', -- "EDGE" table
+'src=src, dest=dest', -- "EDGE" args
+'pagerank_gr_out', -- Output table of PageRank
+NULL, -- Default damping factor (0.85)
+NULL, -- Default max iters (100)
+NULL, -- Default Threshold
+'user_id');
+
+SELECT assert(relative_error(SUM(pagerank), 1) < 0.00001,
+ 'PageRank: Scores do not sum up to 1 for group 1.'
+ ) FROM pagerank_gr_out WHERE user_id=1;