blob: 902c0d2a8b28d023ca1f01c054d6a2d6693c1fd0 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Graph Measures
"""
@file measures.py_in
@namespace graph
"""
import plpy
from utilities.control import MinWarning
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.validate_args import get_cols
from utilities.validate_args import unquote_ident
from utilities.validate_args import table_exists
from utilities.validate_args import table_is_empty
from utilities.validate_args import columns_exist_in_table
from graph_utils import get_graph_usage
from collections import namedtuple
from functools import partial
class Graph(object):
"""Class representing a Graph object"""
def __init__(self,
vertex_table, vertex_col_names,
edge_table, edge_col_names,
grouping_cols=None,
should_validate=True,
schema_madlib='madlib'):
self.vertex_table = vertex_table
self.vertex_id_col = vertex_col_names['id']
self.edge_table = edge_table
self.edge_params = namedtuple('Edge', 'src dest weight')(
edge_col_names['src'], edge_col_names['dest'], edge_col_names['weight'])
self.grouping_cols = grouping_cols
self._madlib = schema_madlib
if should_validate:
self._validate()
# ----------------------------------------------------------------------
@staticmethod
def get_edge_params(edge_arg_str):
params_types = {'src': str, 'dest': str, 'weight': str}
default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
return extract_keyvalue_params(edge_arg_str, params_types, default_args)
# ----------------------------------------------------------------------
def _validate(self):
_assert(self.vertex_table and
self.vertex_table.strip().lower() not in ('null', ''),
"Graph: Invalid vertex table name".format(**locals()))
_assert(table_exists(self.vertex_table),
"Graph: Vertex table ({0}) is missing".format(self.vertex_table))
_assert(not table_is_empty(self.vertex_table),
"Graph: Vertex table ({0}) is empty".format(self.vertex_table))
_assert(self.edge_table and self.edge_table.strip().lower() not in ('null', ''),
"Graph: Invalid edge table name".format(**locals()))
_assert(table_exists(self.edge_table),
"Graph: Edge table ({0}) is missing".format(self.edge_table))
_assert(not table_is_empty(self.edge_table),
"Graph: Edge table ({0}) is empty".format(self.edge_table))
existing_cols = set(unquote_ident(i) for i in get_cols(self.vertex_table))
_assert(unquote_ident(self.vertex_id_col) in existing_cols,
"Graph: The vertex column {0} is not present in "
"vertex table ({1})".format(self.vertex_id_col, self.vertex_table))
_assert(columns_exist_in_table(self.edge_table, self.edge_params),
"Graph: Not all columns from {0} are present in edge table ({1})".
format(self.edge_params, self.edge_table))
# ----------------------------------------------------------------------
def closeness(self, out_table, apsp_table, vertex_filter_expr=None, **kwargs):
""" Compute various centrality metrics
Following metrics are computed for a vertex, considering only
reachable vertices for that vertex:
- inverse_sum_dist: Inverse of the sum of shortest distances
- inverse_average_dist: Inverse of the average of shortest distances
- sum_inverse_dist: Sum of the inverse of shortest distances
- k_degree: Total number of reachable vertices
Args:
@param out_table: str. Name of table to store results in
@param apsp_table: str. APSP output is used to compute these metrics.
@param vertex_filter_expr: str. A WHERE clause can be specified to restrict
the output vertices
Returns:
None
"""
grouping_cols_comma = self.grouping_cols + ", " if self.grouping_cols else ''
e = self.edge_params
filter_clause = "WHERE " + vertex_filter_expr if vertex_filter_expr else ''
plpy.execute("""
CREATE TABLE {out_table} AS
SELECT
{grouping_cols_comma}
{e.src},
1.0 / sum_dist AS inverse_sum_dist,
CASE WHEN k_degree = 0 THEN NULL
ELSE k_degree::double precision / sum_dist
END AS inverse_avg_dist,
sum_inverse_dist,
k_degree
FROM (
-- For below measures treat 'Infinity' as NULL so that
-- 'sum' and 'avg' ignore the value.
SELECT
{grouping_cols_comma}
{e.src},
sum(CASE WHEN {e.weight} = 'Infinity'::double precision THEN NULL
WHEN {e.src} = {e.dest} THEN NULL
ELSE {e.weight} END) AS sum_dist,
sum(CASE WHEN {e.weight} = 'Infinity'::double precision THEN NULL
WHEN {e.weight} = 0::double precision THEN 0.
ELSE 1.0 / {e.weight} END) AS sum_inverse_dist,
count(CASE WHEN {e.weight} = 'Infinity'::double precision THEN NULL
WHEN {e.src} = {e.dest} THEN NULL
ELSE 1 END) AS k_degree
FROM {apsp_table}
{filter_clause}
GROUP BY {grouping_cols_comma}
{e.src}
-- Don't place the 'Infinity' checks above in a WHERE clause
-- since groups with only 'Infinity' rows need to show up
-- in output table
) AS q
""".format(**locals()))
# --------------------------------------------------------------------------
def avg_path_length(self, out_table, apsp_table, vertex_filter_expr=None, **kwargs):
""" Compute the average path length of graph
This is the average of the shortest path distances between unique vertices.
Args:
@param out_table: str. Name of table to store results in
@param apsp_table: str. APSP output is used to compute these metrics.
Returns:
None
"""
if self.grouping_cols:
grouping_cols_comma = self.grouping_cols + ", "
group_by_str = 'GROUP BY ' + self.grouping_cols
else:
grouping_cols_comma = group_by_str = ''
e = self.edge_params
filter_clause = "AND " + vertex_filter_expr if vertex_filter_expr else ''
plpy.execute("""
CREATE TABLE {out_table} AS
SELECT
{grouping_cols_comma}
AVG({e.weight}::double precision) as avg_path_length
FROM {apsp_table}
WHERE {e.src} != {e.dest}
{filter_clause}
{group_by_str}
""".format(**locals()))
# ----------------------------------------------------------------------
def in_out_degrees(self, out_table, **kwargs):
"""
Args:
@param out_table: str. Name of table to store results
Returns:
None
"""
# TODO: validate if output columns names are in grouping_cols
if self.grouping_cols:
grouping_cols_comma = self.grouping_cols + ", "
else:
grouping_cols_comma = ''
e = self.edge_params
plpy.execute("""
CREATE TABLE {out_table} AS
SELECT
{grouping_cols_comma}
coalesce(in_q.vertex, out_q.vertex) as {self.vertex_id_col},
coalesce(indegree, 0) as indegree,
coalesce(outdegree, 0) as outdegree
FROM
(
SELECT
{grouping_cols_comma}
{e.dest} as vertex,
count(*) as indegree
FROM {self.edge_table}
WHERE {e.src} != {e.dest} AND
{e.src} IS NOT NULL AND
{e.dest} IS NOT NULL
GROUP BY {grouping_cols_comma}
{e.dest}
) as in_q
FULL OUTER JOIN
(
SELECT
{grouping_cols_comma}
{e.src} as vertex,
count(*) as outdegree
FROM {self.edge_table}
WHERE {e.src} != {e.dest} AND
{e.src} IS NOT NULL AND
{e.dest} IS NOT NULL
GROUP BY {grouping_cols_comma}
{e.src}
) as out_q
USING ({grouping_cols_comma} vertex)
""".format(**locals()))
# --------------------------------------------------------------------------
def diameter(self, out_table, apsp_table, **kwargs):
""" Compute the diameter of graph
Diameter is defined as the maximum of the shortest path distances between
any pair of vertices
Args:
@param out_table: str. Name of table to store results in
@param apsp_table: str. APSP output is used to compute these metrics.
Returns:
None
"""
if self.grouping_cols:
grouping_cols_comma = self.grouping_cols + ", "
group_by_str = 'GROUP BY ' + self.grouping_cols
else:
grouping_cols_comma = group_by_str = ''
e = self.edge_params
plpy.execute("""
CREATE TABLE {out_table} AS
SELECT
{grouping_cols_comma}
{e.weight} AS diameter,
{self._madlib}.matrix_agg(
ARRAY[{e.src}, {e.dest}]::double precision[])::BIGINT[]
AS diameter_end_vertices
FROM
{apsp_table} JOIN
(
SELECT
{grouping_cols_comma}
max({e.weight}) as {e.weight}
FROM {apsp_table}
WHERE {e.weight} != 'Infinity'::double precision
{group_by_str}
) q
USING ({grouping_cols_comma} {e.weight})
GROUP BY {grouping_cols_comma} {e.weight}
""".format(**locals()))
# ------------------------------------------------------------------------------
def graph_apsp_measures(schema_madlib, apsp_table, out_table,
measure_name, vertex_filter_expr=None, **kwargs):
""" Define measure that depend on APSP output
This function acts as a stub for all functions that depend on APSP being run
prior.
"""
_assert(table_exists(apsp_table) and not table_is_empty(apsp_table),
"Graph: Invalid APSP table: {0}".format(apsp_table))
summary_table_name = add_postfix(apsp_table, "_summary")
summary_cols = ['vertex_table', 'vertex_id',
'edge_table', 'edge_args', 'grouping_cols']
_assert(table_exists(summary_table_name),
"Graph: Summary APSP table ({0}) does not exist".format(summary_table_name))
_assert(columns_exist_in_table(summary_table_name, summary_cols, schema_madlib),
"Graph: Missing some columns from summary table ({0})".format(summary_table_name))
_assert(out_table and out_table.strip().lower() not in ('null', ''),
"Graph: Invalid output table name ({0})".format(out_table))
_assert(not table_exists(out_table),
"Graph: Output table ({0}) already exists".format(out_table))
with MinWarning('warning'):
s = plpy.execute("SELECT * FROM {0}".format(summary_table_name))[0]
edge_col_names = Graph.get_edge_params(s['edge_args'])
g = Graph(s['vertex_table'], dict([('id', s['vertex_id'])]),
s['edge_table'], edge_col_names,
s['grouping_cols'],
should_validate=False,
schema_madlib=schema_madlib)
try:
measure_func = getattr(g, measure_name)
measure_func(out_table, apsp_table,
vertex_filter_expr=vertex_filter_expr)
except AttributeError:
plpy.error('Measure {0} not implemented yet'.format(measure_name))
# ----------------------------------------------------------------------
graph_closeness = partial(graph_apsp_measures, measure_name='closeness')
graph_diameter = partial(graph_apsp_measures, measure_name='diameter')
graph_avg_path_length = partial(graph_apsp_measures, measure_name='avg_path_length')
def graph_vertex_degrees(schema_madlib, vertex_table, vertex_id, edge_table,
edge_args, out_table, grouping_cols, **kwargs):
"""
Args:
@param schema_madlib: str. Name of MADlib schema
@param vertex_table: str. Table name containing vertex data
@param vertex_id: str. Column name containing ids of vertices
@param edge_table: str. Table name containing edge data
@param edge_args: str. Parameters describing edges
@param out_table: str. Name of table to store results
@param grouping_cols: str. Columns to group computation by
Returns:
None
"""
_assert(out_table and out_table.strip().lower() not in ('null', ''),
"Graph: Invalid output table name!".format(**locals()))
_assert(not table_exists(out_table),
"Graph: Output table already exists!".format(**locals()))
if not vertex_id:
vertex_id = 'id'
edge_col_names = Graph.get_edge_params(edge_args)
g = Graph(vertex_table, dict([('id', vertex_id)]),
edge_table, edge_col_names, grouping_cols,
schema_madlib=schema_madlib)
g.in_out_degrees(out_table)
# ----------------------------------------------------------------------
# -----------------------------------------------------------------------
# All help functions
# -----------------------------------------------------------------------
def graph_closeness_help(schema_madlib, message, **kwargs):
intro = """
The Closeness function returns various closeness centrality measures and the
k-degree for given subset of vertices. The closeness measures are the inverse of
the sum, the inverse of the average, and the sum of inverses of the shortest
distances to all reachable target vertices (excluding the source vertex).
"""
if not message:
help_string = intro + """
For more details:
SELECT {schema_madlib}.graph_closeness('usage')
"""
elif message.lower() in ['usage', 'help', '?']:
help_string = intro + """
----------------------------------------------------------------------------
USAGE
----------------------------------------------------------------------------
SELECT {schema_madlib}.graph_closeness(
apsp_table TEXT, -- Name of table containing APSP results
out_table TEXT, -- Name of table to store Closeness measuress
vertex_filter_expr TEXT -- Valid PostgreSQL expression that describes the
-- vertices to generate closeness measures for.
)
----------------------------------------------------------------------------
OUTPUT
----------------------------------------------------------------------------
The output table contains a row for every vertex of every group and have
the following columns (in addition to the grouping columns):
- inverse_sum_dist : Inverse of the sum of shortest distances to all reachable
vertices.
- inverse_average_dist: Inverse of the average of shortest distances to all
reachable vertices.
- sum_inverse_dist : Sum of the inverse of shortest distances to all reachable
vertices.
- k_degree : Total number of reachable vertices.
"""
else:
help_string = "No such option. Use {schema_madlib}.graph_closeness()"
return help_string.format(schema_madlib=schema_madlib)
# -------------------------------------------------------------------------
def graph_diameter_help(schema_madlib, message, **kwargs):
intro = """
Diameter is defined as the longest of all shortest paths in a graph.
"""
if not message:
help_string = intro + """
For more details:
SELECT {schema_madlib}.graph_diameter('usage')
"""
elif message.lower() in ['usage', 'help', '?']:
help_string = intro + """
----------------------------------------------------------------------------
USAGE
----------------------------------------------------------------------------
SELECT {schema_madlib}.graph_diameter(
apsp_table TEXT, -- Name of table containing APSP results
out_table TEXT -- Name of table to store Closeness measuress
)
----------------------------------------------------------------------------
OUTPUT
----------------------------------------------------------------------------
It contains a row for every group, the diameter value and the two vertices
that are the farthest apart.
"""
else:
help_string = "No such option. Use {schema_madlib}.graph_diameter()"
return help_string.format(schema_madlib=schema_madlib)
# -------------------------------------------------------------------------
def graph_avg_path_length_help(schema_madlib, message, **kwargs):
intro = """
This function computes the average of the shortest paths between each pair of
vertices. Average path length is based on "reachable target vertices", so it
ignores infinite-length paths between vertices that are not connected.
"""
if not message:
help_string = intro + """
For more details:
SELECT {schema_madlib}.graph_avg_path_length('usage')
"""
elif message.lower() in ['usage', 'help', '?']:
help_string = intro + """
----------------------------------------------------------------------------
USAGE
----------------------------------------------------------------------------
SELECT {schema_madlib}.graph_avg_path_length(
apsp_table TEXT, -- Name of table containing APSP results
out_table TEXT -- Name of table to store Closeness measuress
)
----------------------------------------------------------------------------
OUTPUT
----------------------------------------------------------------------------
It contains a row for every group, and the average path value.
"""
else:
help_string = "No such option. Use {schema_madlib}.graph_avg_path_length()"
return help_string.format(schema_madlib=schema_madlib)
# -------------------------------------------------------------------------
def graph_vertex_degrees_help(schema_madlib, message, **kwargs):
intro = """
This function computes the degree of each node. The node degree is the number of
edges adjacent to that node. The node in-degree is the number of edges pointing
in to the node and node out-degree is the number of edges pointing out of the
node.
"""
usage_text = get_graph_usage(schema_madlib,
'graph_vertex_degrees',
"""
out_table TEXT, -- Name of the table to store the result of apsp.
grouping_cols TEXT -- The list of grouping columns.""")
if not message:
help_string = intro + """
For more details:
SELECT {schema_madlib}.graph_vertex_degrees('usage')
"""
elif message.lower() in ['usage', 'help', '?']:
help_string = intro + """
{graph_usage}
----------------------------------------------------------------------------
OUTPUT
----------------------------------------------------------------------------
It contains a row for every vertex of every group and has the following columns
(in addition to the grouping columns):
- vertex : The id for the source vertex. Will use the input vertex
column 'id' for column naming.
- indegree : Number of incoming edges to the vertex.
- outdegree : Number of outgoing edges from the vertex.
"""
else:
help_string = "No such option. Use {schema_madlib}.graph_vertex_degrees()"
return help_string.format(schema_madlib=schema_madlib,
graph_usage=usage_text)
# -------------------------------------------------------------------------