blob: ad8e748d49768e1e9adac7a705f2646a57c572e6 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# HITS
# Please refer to the hits.sql_in file for the documentation
"""
@file hits.py_in
@namespace graph
"""
import math
import plpy
import sys
from graph_utils import get_graph_usage
from graph_utils import get_default_threshold_for_link_analysis
from graph_utils import update_output_grouping_tables_for_link_analysis
from graph_utils import validate_graph_coding
from graph_utils import validate_params_for_link_analysis
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
from utilities.utilities import is_platform_pg
from utilities.validate_args import columns_exist_in_table, drop_tables
from utilities.validate_args import get_cols_and_types, table_exists
from utilities.utilities import rename_table
def validate_hits_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, max_iter, threshold,
grouping_cols_list=None):
"""
Function to validate input parameters for HITS
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
out_table, 'HITS')
# Validate args such as threshold and max_iter
validate_params_for_link_analysis(schema_madlib, "HITS",
threshold, max_iter,
edge_table, grouping_cols_list)
def hits(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
out_table, max_iter, threshold, grouping_cols, **kwargs):
"""
Function that computes the HITS scores
Args:
@param schema_madlib:
@param vertex_table:
@param vertex_id:
@param edge_table:
@param edge_args:
@param out_table:
@param max_iter:
@param threshold:
@param grouping_cols:
@param kwargs:
"""
with MinWarning('warning'):
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
edge_params = extract_keyvalue_params(
edge_args, params_types, default_args)
# populate default values for optional params if null
if max_iter is None:
max_iter = 100
if not vertex_id:
vertex_id = "id"
if not grouping_cols:
grouping_cols = ''
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
validate_hits_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, max_iter, threshold,
grouping_cols_list)
summary_table = add_postfix(out_table, "_summary")
_assert(not table_exists(summary_table),
"""Graph HITS: Output summary table ({summary_table}) already
exists.""".format(**locals()))
src = edge_params["src"]
dest = edge_params["dest"]
n_vertices = plpy.execute("""
SELECT COUNT({0}) AS cnt
FROM {1}
""".format(vertex_id, vertex_table))[0]["cnt"]
if threshold is None:
threshold = get_default_threshold_for_link_analysis(n_vertices)
# table/column names used when grouping_cols is set.
edge_temp_table = unique_string(desp='temp_edge')
grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
distribution = ('' if is_platform_pg() else
"DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, dest))
drop_tables([edge_temp_table])
plpy.execute("""
CREATE TEMP TABLE {edge_temp_table} AS
SELECT * FROM {edge_table}
{distribution}
""".format(**locals()))
######################################################################
# Set initial authority_norm and hub_norm as 1, so that later the final
# norm should be positive number
authority_init_value = 1.0
hub_init_value = 1.0
subquery1 = unique_string(desp='subquery1')
distinct_grp_table = ''
select_grouping_cols_comma = ''
select_subquery1_grouping_cols_comma = ''
group_by_clause = ''
grouping_cols_for_create_table = ''
grouping_cols_for_create_table_comma = ''
# This table is created only when grouping is used.
temp_summary_table = None
if grouping_cols:
distinct_grp_table = unique_string(desp='grp')
drop_tables([distinct_grp_table])
plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
""".format(**locals()))
group_by_clause = get_table_qualified_col_str(subquery1, grouping_cols_list)
select_grouping_cols_comma = group_by_clause + ','
select_subquery1_grouping_cols_comma = grouping_cols + ','
group_by_clause = 'GROUP BY ' + grouping_cols + ',' + vertex_id
cols_names_types = get_cols_and_types(edge_table)
grouping_cols_for_create_table = ', '.join([c_name + " " + c_type
for (c_name, c_type)
in cols_names_types
if c_name in
grouping_cols_list])
grouping_cols_for_create_table_comma = \
grouping_cols_for_create_table + ','
temp_summary_table = unique_string(desp='temp_summary')
drop_tables([temp_summary_table])
plpy.execute("""
CREATE TEMP TABLE {temp_summary_table} (
{grouping_cols_for_create_table}
)
""".format(**locals()))
# Create output table. This will be updated whenever a group converges
# Note that vertex_id is assumed to be an integer (as described in
# documentation)
plpy.execute("""
CREATE TABLE {out_table} (
{grouping_cols_for_create_table_comma}
{vertex_id} BIGINT,
authority DOUBLE PRECISION,
hub DOUBLE PRECISION
)
""".format(**locals()))
# Intermediate tables required.
cur = unique_string(desp='cur')
message = unique_string(desp='message')
# curalias and msgalias are used as aliases for current and
# message tables respectively during self joins
curalias = unique_string(desp='curalias')
msgalias = unique_string(desp='msgalias')
message_unconv = unique_string(desp='message_unconv')
subquery2 = unique_string(desp='subquery2')
# GPDB has distributed by clauses to help them with indexing.
# For Postgres we add the index explicitly.
if is_platform_pg():
plpy.execute("CREATE INDEX ON {0}({1})".format(
edge_temp_table, dest))
if is_platform_pg():
cur_distribution = cnts_distribution = ''
else:
cur_distribution = cnts_distribution = \
"DISTRIBUTED BY ({0}{1})".format(
grouping_cols_comma, vertex_id)
cur_join_clause = " {cur}.{vertex_id} = {edge_temp_table}.{dest}"\
.format(**locals())
curalias_join_clause = "{curalias}.{vertex_id} = {edge_temp_table}.{src}"\
.format(**locals())
drop_tables([cur])
plpy.execute("""
CREATE TEMP TABLE {cur} AS
SELECT {select_grouping_cols_comma} {subquery1}.{vertex_id},
{authority_init_value}::DOUBLE PRECISION AS authority,
{hub_init_value}::DOUBLE PRECISION AS hub
FROM (
SELECT {select_subquery1_grouping_cols_comma} {vertex_table}.{vertex_id}
FROM {edge_temp_table} JOIN {vertex_table}
ON {edge_temp_table}.{src}={vertex_table}.{vertex_id}
UNION
SELECT {select_subquery1_grouping_cols_comma} {vertex_table}.{vertex_id}
FROM {edge_temp_table} JOIN {vertex_table}
ON {edge_temp_table}.{dest}={vertex_table}.{vertex_id}
) {subquery1}
{group_by_clause}
{cur_distribution}
""".format(**locals()))
# The summary table contains the total number of iterations for each
# group
plpy.execute("""
CREATE TABLE {summary_table} (
{grouping_cols_for_create_table_comma}
__iterations__ INTEGER
)
""".format(**locals()))
# create message table
cur_grouping_cols_comma = ''
grouping_cols_join_condition_cur_and_edge = ''
grouping_cols_join_condition_cur_and_curalias = ''
# update message table
msg_grouping_cols_comma = ''
grouping_cols_where_clause_msg_subquery2 = ''
grouping_cols_join_condition_msg_and_edge = ''
grouping_cols_join_condition_msg_and_msgalias = ''
grouping_cols_where_condition_msg_subquery1 = ''
group_by_msg_grouping_cols = ''
# check for convergence
select_distinct_unconverged_rows = '{0}.{1}'.format(cur, vertex_id)
grouping_cols_join_condition_cur_and_msg = ''
if grouping_cols:
cur_grouping_cols = get_table_qualified_col_str(cur, grouping_cols_list)
cur_grouping_cols_comma = cur_grouping_cols + ','
grouping_cols_join_condition_cur_and_edge = ' AND ' + \
_check_groups(cur, edge_temp_table, grouping_cols_list)
grouping_cols_join_condition_cur_and_curalias = ' AND ' + \
_check_groups(cur, curalias, grouping_cols_list)
msg_grouping_cols = get_table_qualified_col_str(message, grouping_cols_list)
msg_grouping_cols_comma = msg_grouping_cols + ','
grouping_cols_where_clause_msg_subquery2 = ' AND ' + \
_check_groups(message, subquery2, grouping_cols_list)
grouping_cols_join_condition_msg_and_edge = ' AND ' + \
_check_groups(message, edge_temp_table, grouping_cols_list)
grouping_cols_join_condition_msg_and_msgalias = ' AND ' + \
_check_groups(message, msgalias, grouping_cols_list)
group_by_msg_grouping_cols = ' GROUP BY ' + msg_grouping_cols
grouping_cols_where_condition_msg_subquery1 = ' WHERE ' + \
_check_groups(message, subquery1, grouping_cols_list)
# this is used for the message_unconv table to find out how many groups
# have converged
select_distinct_unconverged_rows = cur_grouping_cols
grouping_cols_join_condition_cur_and_msg = ' AND ' + \
_check_groups(cur, message, grouping_cols_list)
# Variables common to both grouping and non-grouping cases
message_join_clause = "{message}.{vertex_id} = \
{edge_temp_table}.{src}".format(**locals())
msgalias_join_clause = "{msgalias}.{vertex_id} = {edge_temp_table}.{dest}".format(
**locals())
sum_norm_square_root = unique_string(desp='sum_norm_sqr_root')
cur_unconv = unique_string(desp='cur_unconv')
converged = False
iteration_num = 0
"""
We need to calculate the hub and authority scores for each iteration
and pass the current iteration values to the next iteration.
To achieve this, we use the following tables
1. cur -> This gets initialized with default values for both authority
and hub.
2. message -> This gets created for each iteration with newly
computed scores based on cur. At the end of each iteration, we rename
message to cur as a way to pass authority and hub values to the next
iteration.
This convention is similar to message passing paradigm in
distributed systems such as spark.
"""
for iteration_num in range(max_iter):
calculate_authority_and_hub_scores(**locals())
# Check for convergence only if threshold != 0.
if threshold != 0:
converged = check_for_convergence(**locals())
if iteration_num > 0 and grouping_cols:
# Update result and summary tables for groups that have
# converged
# since the last iteration.
update_output_grouping_tables_for_link_analysis(temp_summary_table,
iteration_num,
summary_table,
out_table,
message,
grouping_cols_list,
cur_unconv,
message_unconv)
drop_tables([cur_unconv])
plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
{cur_unconv} """.format(**locals()))
drop_tables([cur])
plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
""".format(**locals()))
drop_tables([sum_norm_square_root])
if converged:
break
update_final_results(schema_madlib, converged, threshold, cur, temp_summary_table,
iteration_num, summary_table, out_table,
grouping_cols_list, cur_unconv, distinct_grp_table)
# Cleanup All the intermediate tables
drop_tables([cur, message, cur_unconv, message_unconv, edge_temp_table])
if grouping_cols:
drop_tables([distinct_grp_table, temp_summary_table])
def update_final_results(schema_madlib, converged, threshold, cur, temp_summary_table,
iteration_num, summary_table, out_table,
grouping_cols_list, cur_unconv, distinct_grp_table):
"""
If there still are some converged/unconverged nodes (within groups/entire
table), update results table for those nodes.
"""
if grouping_cols_list:
if not converged:
if threshold != 0:
# We completed max_iters, but there are still some unconverged
# groups # Update the result and summary tables for unconverged
# groups.
update_output_grouping_tables_for_link_analysis(temp_summary_table,
iteration_num,
summary_table,
out_table, cur,
grouping_cols_list,
cur_unconv)
else:
# No group has converged. List of all group values are in
# distinct_grp_table.
update_output_grouping_tables_for_link_analysis(temp_summary_table,
iteration_num,
summary_table,
out_table, cur,
grouping_cols_list,
distinct_grp_table)
else:
rename_table(schema_madlib, cur, out_table)
plpy.execute("""
INSERT INTO {summary_table} VALUES
({iteration_num}+1)
""".format(**locals()))
def check_for_convergence(**kwargs):
# message_unconv and cur_unconv will contain the unconverged
# groups after current and previous iterations respectively.
# we check if there is at least one unconverged node (limit 1
# is used in the query).
plpy.execute("""
CREATE TABLE {message_unconv} AS
SELECT DISTINCT {select_distinct_unconverged_rows}
FROM {message}
INNER JOIN {cur}
ON {cur}.{vertex_id}={message}.{vertex_id}
{grouping_cols_join_condition_cur_and_msg}
WHERE ABS({cur}.authority-{message}.authority) > {threshold}
OR ABS({cur}.hub-{message}.hub) > {threshold}
""".format(**kwargs))
unconverged_node_num = plpy.execute("""
SELECT COUNT(*) AS cnt FROM {message_unconv}
""".format(**kwargs))[0]["cnt"]
return unconverged_node_num == 0
def calculate_authority_and_hub_scores(**kwargs):
"""
This function is responsible for calculating the authority and hub scores.
This is done in a two-step process:
1. create message table and compute authority score.
2. Use authority scores computed to update the hub score.
:param kwargs: dict of locals() of the calling function.
:return:
"""
###################################################################
# HITS scores for nodes in a graph at any given iteration 'i' is
# calculated as following:
# authority_i(A) = hub_i(B) + hub_i(C) + ..., where B, C are nodes
# that have edges that point to node A
# After calculating authority scores for all nodes, hub scores are
# calculated as following:
# hub_i(A) = authority_i(D) + authority_i(E) + ..., where D, E are
# nodes that A points to
# At the end of each iteration, a normalization will
# be done for all authority scores and hub scores using L2 distance
###################################################################
###################################################################
# calculate authority
# if there is no node that point to A, authority_i(A) = 0
###################################################################
plpy.execute("""
CREATE TABLE {message} AS
SELECT {cur_grouping_cols_comma} {cur}.{vertex_id} AS {vertex_id},
COALESCE(SUM({curalias}.hub), 0.0) AS authority,
{cur}.hub AS hub
FROM {cur}
LEFT JOIN {edge_temp_table} ON
{cur_join_clause} {grouping_cols_join_condition_cur_and_edge}
LEFT JOIN {cur} AS {curalias} ON
{curalias_join_clause} {grouping_cols_join_condition_cur_and_curalias}
GROUP BY {cur_grouping_cols_comma} {cur}.{vertex_id}, {cur}.hub
{cur_distribution}
""".format(**kwargs))
###################################################################
# calculate hub
# if node A doesn't point to any node, hub_i(A) = 0
###################################################################
plpy.execute("""
UPDATE {message}
SET hub = {subquery2}.hub
FROM
(SELECT {msg_grouping_cols_comma} {message}.{vertex_id} AS {vertex_id},
COALESCE(SUM({msgalias}.authority), 0) AS hub
FROM {message}
LEFT JOIN {edge_temp_table} ON
{message_join_clause} {grouping_cols_join_condition_msg_and_edge}
LEFT JOIN {message} AS {msgalias} ON
{msgalias_join_clause} {grouping_cols_join_condition_msg_and_msgalias}
GROUP BY {msg_grouping_cols_comma} {message}.{vertex_id}) AS {subquery2}
WHERE {subquery2}.{vertex_id} = {message}.{vertex_id}
{grouping_cols_where_clause_msg_subquery2}
""".format(**kwargs))
# normalize authority and hub score with L2 distance
plpy.execute("""
CREATE TEMP TABLE {sum_norm_square_root} AS
SELECT {msg_grouping_cols_comma}
SQRT(SUM(POWER(authority, 2))) AS auth_sum_norm_square_root,
SQRT(SUM(POWER(hub, 2))) AS hub_sum_norm_square_root
FROM {message}
{group_by_msg_grouping_cols}
""".format(**kwargs))
num_zero_sum_norm_square_root = plpy.execute("""
SELECT COUNT(*) AS cnt
FROM {sum_norm_square_root}
WHERE auth_sum_norm_square_root = 0
OR hub_sum_norm_square_root = 0
""".format(**kwargs))[0]["cnt"]
if num_zero_sum_norm_square_root > 0:
plpy.error("Error while normalizing authority score, please \
make sure your graph is a directed graph")
plpy.execute("""
UPDATE {message}
SET authority = {message}.authority/{subquery1}.auth_sum_norm_square_root,
hub = {message}.hub/{subquery1}.hub_sum_norm_square_root
from (SELECT {grouping_cols_comma}
auth_sum_norm_square_root,
hub_sum_norm_square_root
FROM {sum_norm_square_root}) {subquery1}
{grouping_cols_where_condition_msg_subquery1}
""".format(**kwargs))
def hits_help(schema_madlib, message, **kwargs):
"""
Help function for hits
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(schema_madlib, 'HITS',
"""out_table TEXT, -- Name of the output table for HITS
max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100)
threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*1000),
-- N is number of vertices in the graph)
grouping_cols TEXT -- Comma separated column names to group on
-- (DEFAULT = NULL, no grouping)
""") + """
A summary table is also created that contains information regarding the
number of iterations required for convergence. It is named by adding the
suffix '_summary' to the 'out_table' parameter.
"""
else:
help_string = """
----------------------------------------------------------------------------
SUMMARY
----------------------------------------------------------------------------
Given a directed graph, hits algorithm finds the authority and hub scores of
all the vertices in the graph.
--
For an overview on usage, run:
SELECT {schema_madlib}.hits('usage');
"""
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------