| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Graph Measures |
| |
| """ |
| @file measures.py_in |
| |
| @namespace graph |
| """ |
| |
| import plpy |
| |
| from utilities.control import MinWarning |
| from utilities.utilities import _assert |
| from utilities.utilities import add_postfix |
| from utilities.utilities import extract_keyvalue_params |
| |
| from utilities.validate_args import get_cols |
| from utilities.validate_args import unquote_ident |
| from utilities.validate_args import table_exists |
| from utilities.validate_args import table_is_empty |
| from utilities.validate_args import columns_exist_in_table |
| |
| from graph_utils import get_graph_usage |
| |
| from collections import namedtuple |
| from functools import partial |
| |
| |
| class Graph(object): |
| """Class representing a Graph object""" |
| def __init__(self, |
| vertex_table, vertex_col_names, |
| edge_table, edge_col_names, |
| grouping_cols=None, |
| should_validate=True, |
| schema_madlib='madlib'): |
| self.vertex_table = vertex_table |
| self.vertex_id_col = vertex_col_names['id'] |
| |
| self.edge_table = edge_table |
| self.edge_params = namedtuple('Edge', 'src dest weight')( |
| edge_col_names['src'], edge_col_names['dest'], edge_col_names['weight']) |
| |
| self.grouping_cols = grouping_cols |
| self._madlib = schema_madlib |
| |
| if should_validate: |
| self._validate() |
| |
| # ---------------------------------------------------------------------- |
| |
| @staticmethod |
| def get_edge_params(edge_arg_str): |
| params_types = {'src': str, 'dest': str, 'weight': str} |
| default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} |
| return extract_keyvalue_params(edge_arg_str, params_types, default_args) |
| # ---------------------------------------------------------------------- |
| |
| def _validate(self): |
| _assert(self.vertex_table and |
| self.vertex_table.strip().lower() not in ('null', ''), |
| "Graph: Invalid vertex table name".format(**locals())) |
| _assert(table_exists(self.vertex_table), |
| "Graph: Vertex table ({0}) is missing".format(self.vertex_table)) |
| _assert(not table_is_empty(self.vertex_table), |
| "Graph: Vertex table ({0}) is empty".format(self.vertex_table)) |
| |
| _assert(self.edge_table and self.edge_table.strip().lower() not in ('null', ''), |
| "Graph: Invalid edge table name".format(**locals())) |
| _assert(table_exists(self.edge_table), |
| "Graph: Edge table ({0}) is missing".format(self.edge_table)) |
| _assert(not table_is_empty(self.edge_table), |
| "Graph: Edge table ({0}) is empty".format(self.edge_table)) |
| |
| existing_cols = set(unquote_ident(i) for i in get_cols(self.vertex_table)) |
| _assert(unquote_ident(self.vertex_id_col) in existing_cols, |
| "Graph: The vertex column {0} is not present in " |
| "vertex table ({1})".format(self.vertex_id_col, self.vertex_table)) |
| |
| _assert(columns_exist_in_table(self.edge_table, self.edge_params), |
| "Graph: Not all columns from {0} are present in edge table ({1})". |
| format(self.edge_params, self.edge_table)) |
| # ---------------------------------------------------------------------- |
| |
| def closeness(self, out_table, apsp_table, vertex_filter_expr=None, **kwargs): |
| """ Compute various centrality metrics |
| |
| Following metrics are computed for a vertex, considering only |
| reachable vertices for that vertex: |
| - inverse_sum_dist: Inverse of the sum of shortest distances |
| - inverse_average_dist: Inverse of the average of shortest distances |
| - sum_inverse_dist: Sum of the inverse of shortest distances |
| - k_degree: Total number of reachable vertices |
| |
| Args: |
| @param out_table: str. Name of table to store results in |
| @param apsp_table: str. APSP output is used to compute these metrics. |
| @param vertex_filter_expr: str. A WHERE clause can be specified to restrict |
| the output vertices |
| Returns: |
| None |
| """ |
| grouping_cols_comma = self.grouping_cols + ", " if self.grouping_cols else '' |
| e = self.edge_params |
| filter_clause = "WHERE " + vertex_filter_expr if vertex_filter_expr else '' |
| |
| plpy.execute(""" |
| CREATE TABLE {out_table} AS |
| SELECT |
| {grouping_cols_comma} |
| {e.src}, |
| 1.0 / sum_dist AS inverse_sum_dist, |
| CASE WHEN k_degree = 0 THEN NULL |
| ELSE k_degree::double precision / sum_dist |
| END AS inverse_avg_dist, |
| sum_inverse_dist, |
| k_degree |
| FROM ( |
| -- For below measures treat 'Infinity' as NULL so that |
| -- 'sum' and 'avg' ignore the value. |
| SELECT |
| {grouping_cols_comma} |
| {e.src}, |
| sum(CASE WHEN {e.weight} = 'Infinity'::double precision THEN NULL |
| WHEN {e.src} = {e.dest} THEN NULL |
| ELSE {e.weight} END) AS sum_dist, |
| sum(CASE WHEN {e.weight} = 'Infinity'::double precision THEN NULL |
| WHEN {e.weight} = 0::double precision THEN 0. |
| ELSE 1.0 / {e.weight} END) AS sum_inverse_dist, |
| count(CASE WHEN {e.weight} = 'Infinity'::double precision THEN NULL |
| WHEN {e.src} = {e.dest} THEN NULL |
| ELSE 1 END) AS k_degree |
| FROM {apsp_table} |
| {filter_clause} |
| GROUP BY {grouping_cols_comma} |
| {e.src} |
| -- Don't place the 'Infinity' checks above in a WHERE clause |
| -- since groups with only 'Infinity' rows need to show up |
| -- in output table |
| ) AS q |
| """.format(**locals())) |
| # -------------------------------------------------------------------------- |
| |
| def avg_path_length(self, out_table, apsp_table, vertex_filter_expr=None, **kwargs): |
| """ Compute the average path length of graph |
| |
| This is the average of the shortest path distances between unique vertices. |
| |
| Args: |
| @param out_table: str. Name of table to store results in |
| @param apsp_table: str. APSP output is used to compute these metrics. |
| Returns: |
| None |
| """ |
| if self.grouping_cols: |
| grouping_cols_comma = self.grouping_cols + ", " |
| group_by_str = 'GROUP BY ' + self.grouping_cols |
| else: |
| grouping_cols_comma = group_by_str = '' |
| e = self.edge_params |
| filter_clause = "AND " + vertex_filter_expr if vertex_filter_expr else '' |
| plpy.execute(""" |
| CREATE TABLE {out_table} AS |
| SELECT |
| {grouping_cols_comma} |
| AVG({e.weight}::double precision) as avg_path_length |
| FROM {apsp_table} |
| WHERE {e.src} != {e.dest} |
| {filter_clause} |
| {group_by_str} |
| """.format(**locals())) |
| # ---------------------------------------------------------------------- |
| |
| def in_out_degrees(self, out_table, **kwargs): |
| """ |
| Args: |
| @param out_table: str. Name of table to store results |
| |
| Returns: |
| None |
| """ |
| # TODO: validate if output columns names are in grouping_cols |
| if self.grouping_cols: |
| grouping_cols_comma = self.grouping_cols + ", " |
| else: |
| grouping_cols_comma = '' |
| e = self.edge_params |
| |
| plpy.execute(""" |
| CREATE TABLE {out_table} AS |
| SELECT |
| {grouping_cols_comma} |
| coalesce(in_q.vertex, out_q.vertex) as {self.vertex_id_col}, |
| coalesce(indegree, 0) as indegree, |
| coalesce(outdegree, 0) as outdegree |
| FROM |
| ( |
| SELECT |
| {grouping_cols_comma} |
| {e.dest} as vertex, |
| count(*) as indegree |
| FROM {self.edge_table} |
| WHERE {e.src} != {e.dest} AND |
| {e.src} IS NOT NULL AND |
| {e.dest} IS NOT NULL |
| GROUP BY {grouping_cols_comma} |
| {e.dest} |
| ) as in_q |
| FULL OUTER JOIN |
| ( |
| SELECT |
| {grouping_cols_comma} |
| {e.src} as vertex, |
| count(*) as outdegree |
| FROM {self.edge_table} |
| WHERE {e.src} != {e.dest} AND |
| {e.src} IS NOT NULL AND |
| {e.dest} IS NOT NULL |
| GROUP BY {grouping_cols_comma} |
| {e.src} |
| ) as out_q |
| USING ({grouping_cols_comma} vertex) |
| """.format(**locals())) |
| # -------------------------------------------------------------------------- |
| |
| def diameter(self, out_table, apsp_table, **kwargs): |
| """ Compute the diameter of graph |
| |
| Diameter is defined as the maximum of the shortest path distances between |
| any pair of vertices |
| Args: |
| @param out_table: str. Name of table to store results in |
| @param apsp_table: str. APSP output is used to compute these metrics. |
| Returns: |
| None |
| """ |
| if self.grouping_cols: |
| grouping_cols_comma = self.grouping_cols + ", " |
| group_by_str = 'GROUP BY ' + self.grouping_cols |
| else: |
| grouping_cols_comma = group_by_str = '' |
| e = self.edge_params |
| plpy.execute(""" |
| CREATE TABLE {out_table} AS |
| SELECT |
| {grouping_cols_comma} |
| {e.weight} AS diameter, |
| {self._madlib}.matrix_agg( |
| ARRAY[{e.src}, {e.dest}]::double precision[])::integer[] |
| AS diameter_end_vertices |
| FROM |
| {apsp_table} JOIN |
| ( |
| SELECT |
| {grouping_cols_comma} |
| max({e.weight}) as {e.weight} |
| FROM {apsp_table} |
| WHERE {e.weight} != 'Infinity'::double precision |
| {group_by_str} |
| ) q |
| USING ({grouping_cols_comma} {e.weight}) |
| GROUP BY {grouping_cols_comma} {e.weight} |
| """.format(**locals())) |
| # ------------------------------------------------------------------------------ |
| |
| |
| def graph_apsp_measures(schema_madlib, apsp_table, out_table, |
| measure_name, vertex_filter_expr=None, **kwargs): |
| """ Define measure that depend on APSP output |
| |
| This function acts as a stub for all functions that depend on APSP being run |
| prior. |
| |
| """ |
| |
| _assert(table_exists(apsp_table) and not table_is_empty(apsp_table), |
| "Graph: Invalid APSP table: {0}".format(apsp_table)) |
| |
| summary_table_name = add_postfix(apsp_table, "_summary") |
| summary_cols = ['vertex_table', 'vertex_id', |
| 'edge_table', 'edge_args', 'grouping_cols'] |
| _assert(table_exists(summary_table_name), |
| "Graph: Summary APSP table ({0}) does not exist".format(summary_table_name)) |
| _assert(columns_exist_in_table(summary_table_name, summary_cols, schema_madlib), |
| "Graph: Missing some columns from summary table ({0})".format(summary_table_name)) |
| _assert(out_table and out_table.strip().lower() not in ('null', ''), |
| "Graph: Invalid output table name ({0})".format(out_table)) |
| _assert(not table_exists(out_table), |
| "Graph: Output table ({0}) already exists".format(out_table)) |
| |
| with MinWarning('warning'): |
| s = plpy.execute("SELECT * FROM {0}".format(summary_table_name))[0] |
| edge_col_names = Graph.get_edge_params(s['edge_args']) |
| g = Graph(s['vertex_table'], dict([('id', s['vertex_id'])]), |
| s['edge_table'], edge_col_names, |
| s['grouping_cols'], |
| should_validate=False, |
| schema_madlib=schema_madlib) |
| try: |
| measure_func = getattr(g, measure_name) |
| measure_func(out_table, apsp_table, |
| vertex_filter_expr=vertex_filter_expr) |
| except AttributeError: |
| plpy.error('Measure {0} not implemented yet'.format(measure_name)) |
| # ---------------------------------------------------------------------- |
| |
| |
| graph_closeness = partial(graph_apsp_measures, measure_name='closeness') |
| graph_diameter = partial(graph_apsp_measures, measure_name='diameter') |
| graph_avg_path_length = partial(graph_apsp_measures, measure_name='avg_path_length') |
| |
| |
| def graph_vertex_degrees(schema_madlib, vertex_table, vertex_id, edge_table, |
| edge_args, out_table, grouping_cols, **kwargs): |
| """ |
| Args: |
| @param schema_madlib: str. Name of MADlib schema |
| @param vertex_table: str. Table name containing vertex data |
| @param vertex_id: str. Column name containing ids of vertices |
| @param edge_table: str. Table name containing edge data |
| @param edge_args: str. Parameters describing edges |
| @param out_table: str. Name of table to store results |
| @param grouping_cols: str. Columns to group computation by |
| |
| |
| Returns: |
| None |
| """ |
| _assert(out_table and out_table.strip().lower() not in ('null', ''), |
| "Graph: Invalid output table name!".format(**locals())) |
| _assert(not table_exists(out_table), |
| "Graph: Output table already exists!".format(**locals())) |
| |
| if not vertex_id: |
| vertex_id = 'id' |
| edge_col_names = Graph.get_edge_params(edge_args) |
| g = Graph(vertex_table, dict([('id', vertex_id)]), |
| edge_table, edge_col_names, grouping_cols, |
| schema_madlib=schema_madlib) |
| g.in_out_degrees(out_table) |
| # ---------------------------------------------------------------------- |
| |
| # ----------------------------------------------------------------------- |
| # All help functions |
| # ----------------------------------------------------------------------- |
| |
| def graph_closeness_help(schema_madlib, message, **kwargs): |
| |
| intro = """ |
| The Closeness function returns various closeness centrality measures and the |
| k-degree for given subset of vertices. The closeness measures are the inverse of |
| the sum, the inverse of the average, and the sum of inverses of the shortest |
| distances to all reachable target vertices (excluding the source vertex). |
| """ |
| |
| if not message: |
| help_string = intro + """ |
| For more details: |
| SELECT {schema_madlib}.graph_closeness('usage') |
| """ |
| elif message.lower() in ['usage', 'help', '?']: |
| help_string = intro + """ |
| |
| ---------------------------------------------------------------------------- |
| USAGE |
| ---------------------------------------------------------------------------- |
| SELECT {schema_madlib}.graph_closeness( |
| apsp_table TEXT, -- Name of table containing APSP results |
| out_table TEXT, -- Name of table to store Closeness measuress |
| vertex_filter_expr TEXT -- Valid PostgreSQL expression that describes the |
| -- vertices to generate closeness measures for. |
| ) |
| |
| ---------------------------------------------------------------------------- |
| OUTPUT |
| ---------------------------------------------------------------------------- |
| The output table contains a row for every vertex of every group and have |
| the following columns (in addition to the grouping columns): |
| - inverse_sum_dist : Inverse of the sum of shortest distances to all reachable |
| vertices. |
| - inverse_average_dist: Inverse of the average of shortest distances to all |
| reachable vertices. |
| - sum_inverse_dist : Sum of the inverse of shortest distances to all reachable |
| vertices. |
| - k_degree : Total number of reachable vertices. |
| |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.graph_closeness()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # ------------------------------------------------------------------------- |
| |
| |
| def graph_diameter_help(schema_madlib, message, **kwargs): |
| |
| intro = """ |
| Diameter is defined as the longest of all shortest paths in a graph. |
| """ |
| |
| if not message: |
| help_string = intro + """ |
| For more details: |
| SELECT {schema_madlib}.graph_diameter('usage') |
| """ |
| elif message.lower() in ['usage', 'help', '?']: |
| help_string = intro + """ |
| |
| ---------------------------------------------------------------------------- |
| USAGE |
| ---------------------------------------------------------------------------- |
| SELECT {schema_madlib}.graph_diameter( |
| apsp_table TEXT, -- Name of table containing APSP results |
| out_table TEXT -- Name of table to store Closeness measuress |
| ) |
| |
| ---------------------------------------------------------------------------- |
| OUTPUT |
| ---------------------------------------------------------------------------- |
| It contains a row for every group, the diameter value and the two vertices |
| that are the farthest apart. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.graph_diameter()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # ------------------------------------------------------------------------- |
| |
| |
| def graph_avg_path_length_help(schema_madlib, message, **kwargs): |
| |
| intro = """ |
| This function computes the average of the shortest paths between each pair of |
| vertices. Average path length is based on "reachable target vertices", so it |
| ignores infinite-length paths between vertices that are not connected. |
| """ |
| |
| if not message: |
| help_string = intro + """ |
| For more details: |
| SELECT {schema_madlib}.graph_avg_path_length('usage') |
| """ |
| elif message.lower() in ['usage', 'help', '?']: |
| help_string = intro + """ |
| |
| ---------------------------------------------------------------------------- |
| USAGE |
| ---------------------------------------------------------------------------- |
| SELECT {schema_madlib}.graph_avg_path_length( |
| apsp_table TEXT, -- Name of table containing APSP results |
| out_table TEXT -- Name of table to store Closeness measuress |
| ) |
| |
| ---------------------------------------------------------------------------- |
| OUTPUT |
| ---------------------------------------------------------------------------- |
| It contains a row for every group, and the average path value. |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.graph_avg_path_length()" |
| |
| return help_string.format(schema_madlib=schema_madlib) |
| # ------------------------------------------------------------------------- |
| |
| |
| def graph_vertex_degrees_help(schema_madlib, message, **kwargs): |
| |
| intro = """ |
| This function computes the degree of each node. The node degree is the number of |
| edges adjacent to that node. The node in-degree is the number of edges pointing |
| in to the node and node out-degree is the number of edges pointing out of the |
| node. |
| """ |
| usage_text = get_graph_usage(schema_madlib, |
| 'graph_vertex_degrees', |
| """ |
| out_table TEXT, -- Name of the table to store the result of apsp. |
| grouping_cols TEXT -- The list of grouping columns.""") |
| |
| if not message: |
| help_string = intro + """ |
| For more details: |
| SELECT {schema_madlib}.graph_vertex_degrees('usage') |
| """ |
| elif message.lower() in ['usage', 'help', '?']: |
| help_string = intro + """ |
| {graph_usage} |
| |
| ---------------------------------------------------------------------------- |
| OUTPUT |
| ---------------------------------------------------------------------------- |
| It contains a row for every vertex of every group and has the following columns |
| (in addition to the grouping columns): |
| - vertex : The id for the source vertex. Will use the input vertex |
| column 'id' for column naming. |
| - indegree : Number of incoming edges to the vertex. |
| - outdegree : Number of outgoing edges from the vertex. |
| |
| """ |
| else: |
| help_string = "No such option. Use {schema_madlib}.graph_vertex_degrees()" |
| |
| return help_string.format(schema_madlib=schema_madlib, |
| graph_usage=usage_text) |
| # ------------------------------------------------------------------------- |