blob: c8b5ab22c9a3062c5bd472cabb6b01b3c2225029 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Weakly Connected Components
# Please refer to the wcc.sql_in file for the documentation
"""
@file wcc.py_in
@namespace graph
"""
import plpy
from utilities.utilities import _assert
from utilities.utilities import _check_groups
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string, split_quoted_delimited_str
from utilities.validate_args import columns_exist_in_table, get_expr_type
from utilities.utilities import is_platform_pg
from utilities.utilities import add_postfix
from utilities.validate_args import table_exists
from utilities.utilities import rename_table
from utilities.control import MinWarning
from graph_utils import validate_graph_coding, get_graph_usage
from graph_utils import validate_output_and_summary_tables
def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, out_table_summary,
grouping_cols_list, module_name):
"""
Function to validate input parameters for wcc
"""
validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
out_table, module_name)
_assert(not table_exists(out_table_summary),
"Graph {module_name}: Output summary table already exists!".format(**locals()))
if grouping_cols_list:
# validate the grouping columns. We currently only support grouping_cols
# to be column names in the edge_table, and not expressions!
_assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib),
"Weakly Connected Components error: "
"One or more grouping columns specified do not exist!")
def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
out_table, grouping_cols, **kwargs):
"""
Function that computes the wcc
Args:
@param vertex_table
@param vertex_id
@param edge_table
@param dest_vertex
@param out_table
@param grouping_cols
"""
old_msg_level = plpy.execute("""
SELECT setting
FROM pg_settings
WHERE name='client_min_messages'
""")[0]['setting']
plpy.execute('SET client_min_messages TO warning')
params_types = {'src': str, 'dest': str}
default_args = {'src': 'src', 'dest': 'dest'}
edge_params = extract_keyvalue_params(
edge_args, params_types, default_args)
# populate default values for optional params if null, and prepare data
# to be written into the summary table (*_st variable names)
if not vertex_id:
vertex_id = "id"
v_st = "id"
else:
v_st = vertex_id
if not grouping_cols:
grouping_cols = ''
out_table_summary = ''
if out_table:
out_table_summary = add_postfix(out_table, "_summary")
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
edge_params, out_table, out_table_summary,
grouping_cols_list, 'Weakly Connected Components')
src = edge_params["src"]
dest = edge_params["dest"]
message = unique_string(desp='message')
oldupdate = unique_string(desp='oldupdate')
newupdate = unique_string(desp='newupdate')
toupdate = unique_string(desp='toupdate')
temp_out_table = unique_string(desp='tempout')
edge_inverse = unique_string(desp='edge_inverse')
distribution = '' if is_platform_pg() else \
"DISTRIBUTED BY ({0})".format(vertex_id)
subq_prefixed_grouping_cols = ''
comma_toupdate_prefixed_grouping_cols = ''
comma_oldupdate_prefixed_grouping_cols = ''
old_new_update_where_condition = ''
new_to_update_where_condition = ''
edge_to_update_where_condition = ''
edge_inverse_to_update_where_condition = ''
BIGINT_MAX = 9223372036854775807
component_id = 'component_id'
grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
comma_grouping_cols = '' if not grouping_cols else ',' + grouping_cols
if not is_platform_pg():
# In Greenplum, to avoid redistribution of data when in later queries,
# edge_table is duplicated by creating a temporary table distributed
# on dest column
plpy.execute(""" CREATE TEMP TABLE {edge_inverse} AS
SELECT * FROM {edge_table} DISTRIBUTED BY ({dest})
""".format(**locals()))
else:
edge_inverse = edge_table
if grouping_cols:
distribution = ('' if is_platform_pg() else
"DISTRIBUTED BY ({0}, {1})".format(grouping_cols,
vertex_id))
# Update some variables useful for grouping based query strings
subq = unique_string(desp='subquery')
distinct_grp_table = unique_string(desp='grptable')
plpy.execute("""
CREATE TABLE {distinct_grp_table} AS
SELECT DISTINCT {grouping_cols} FROM {edge_table}
""".format(**locals()))
comma_toupdate_prefixed_grouping_cols = ', ' + \
get_table_qualified_col_str(toupdate, grouping_cols_list)
comma_oldupdate_prefixed_grouping_cols = ', ' + \
get_table_qualified_col_str(oldupdate, grouping_cols_list)
subq_prefixed_grouping_cols = get_table_qualified_col_str(subq, grouping_cols_list)
old_new_update_where_condition = ' AND ' + \
_check_groups(oldupdate, newupdate, grouping_cols_list)
new_to_update_where_condition = ' AND ' + \
_check_groups(newupdate, toupdate, grouping_cols_list)
edge_to_update_where_condition = ' AND ' + \
_check_groups(edge_table, toupdate, grouping_cols_list)
edge_inverse_to_update_where_condition = ' AND ' + \
_check_groups(edge_inverse, toupdate, grouping_cols_list)
join_grouping_cols = _check_groups(subq, distinct_grp_table, grouping_cols_list)
group_by_clause_newupdate = ('' if not grouping_cols else
'{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
subq, vertex_id))
plpy.execute("""
CREATE TABLE {newupdate} AS
SELECT {subq}.{vertex_id},
CAST({BIGINT_MAX} AS BIGINT) AS {component_id}
{select_grouping_cols}
FROM {distinct_grp_table} INNER JOIN (
SELECT {select_grouping_cols_clause} {src} AS {vertex_id}
FROM {edge_table}
UNION
SELECT {select_grouping_cols_clause} {dest} AS {vertex_id}
FROM {edge_inverse}
) {subq}
ON {join_grouping_cols}
GROUP BY {group_by_clause_newupdate}
{distribution}
""".format(select_grouping_cols=',' + subq_prefixed_grouping_cols,
select_grouping_cols_clause=grouping_cols_comma,
**locals()))
# drop intermediate table
plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
plpy.execute("""
CREATE TEMP TABLE {message} AS
SELECT {vertex_id},
CAST({vertex_id} AS BIGINT) AS {component_id}
{select_grouping_cols_clause}
FROM {newupdate}
{distribution}
""".format(select_grouping_cols_clause=comma_grouping_cols,
**locals()))
else:
plpy.execute("""
CREATE TABLE {newupdate} AS
SELECT {vertex_id}, CAST({BIGINT_MAX} AS BIGINT) AS {component_id}
FROM {vertex_table}
{distribution}
""".format(**locals()))
plpy.execute("""
CREATE TEMP TABLE {message} AS
SELECT {vertex_id}, CAST({vertex_id} AS BIGINT) AS {component_id}
FROM {vertex_table}
{distribution}
""".format(**locals()))
nodes_to_update = 1
while nodes_to_update > 0:
# Look at all the neighbors of a node, and assign the smallest node id
# among the neighbors as its component_id. The next table starts off
# with very high component_id (BIGINT_MAX). The component_id of all nodes
# which obtain a smaller component_id after looking at its neighbors are
# updated in the next table. At every iteration update only those nodes
# whose component_id in the previous iteration are greater than what was
# found in the current iteration.
plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
plpy.execute("""
CREATE TEMP TABLE {oldupdate} AS
SELECT {message}.{vertex_id},
MIN({message}.{component_id}) AS {component_id}
{grouping_cols_select}
FROM {message}
GROUP BY {group_by_clause} {vertex_id}
{distribution}
""".format(grouping_cols_select='' if not grouping_cols else
', {0}'.format(grouping_cols),
group_by_clause=grouping_cols_comma,
**locals()))
plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
plpy.execute("""
CREATE TEMP TABLE {toupdate} AS
SELECT {oldupdate}.{vertex_id},
{oldupdate}.{component_id}
{comma_oldupdate_prefixed_grouping_cols}
FROM {oldupdate}, {newupdate}
WHERE {oldupdate}.{vertex_id}={newupdate}.{vertex_id}
AND {oldupdate}.{component_id}<{newupdate}.{component_id}
{old_new_update_where_condition}
{distribution}
""".format(**locals()))
plpy.execute("""
UPDATE {newupdate} SET
{component_id}={toupdate}.{component_id}
FROM {toupdate}
WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id}
{new_to_update_where_condition}
""".format(**locals()))
plpy.execute("DROP TABLE IF EXISTS {0}".format(message))
plpy.execute("""
CREATE TEMP TABLE {message} AS
SELECT {edge_inverse}.{src} AS {vertex_id},
MIN({toupdate}.{component_id}) AS {component_id}
{comma_toupdate_prefixed_grouping_cols}
FROM {toupdate}, {edge_inverse}
WHERE {edge_inverse}.{dest} = {toupdate}.{vertex_id}
{edge_inverse_to_update_where_condition}
GROUP BY {edge_inverse}.{src} {comma_toupdate_prefixed_grouping_cols}
""".format(select_grouping_cols='' if not grouping_cols
else ', {0}'.format(grouping_cols),
**locals()))
plpy.execute("""
INSERT INTO {message}
SELECT {edge_table}.{dest} AS {vertex_id},
MIN({toupdate}.{component_id}) AS {component_id}
{comma_toupdate_prefixed_grouping_cols}
FROM {toupdate}, {edge_table}
WHERE {edge_table}.{src} = {toupdate}.{vertex_id}
{edge_to_update_where_condition}
GROUP BY {edge_table}.{dest} {comma_toupdate_prefixed_grouping_cols}
""".format(select_grouping_cols='' if not grouping_cols
else ', {0}'.format(grouping_cols),
**locals()))
plpy.execute("DROP TABLE {0}".format(oldupdate))
if grouping_cols:
nodes_to_update = plpy.execute("""
SELECT SUM(cnt) AS cnt_sum
FROM (
SELECT COUNT(*) AS cnt
FROM {toupdate}
GROUP BY {grouping_cols}
) t
""".format(**locals()))[0]["cnt_sum"]
else:
nodes_to_update = plpy.execute("""
SELECT COUNT(*) AS cnt FROM {toupdate}
""".format(**locals()))[0]["cnt"]
if not is_platform_pg():
# Drop intermediate table created for Greenplum
plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_inverse))
rename_table(schema_madlib, newupdate, out_table)
# Create summary table. We only need the vertex_id and grouping columns
# in it.
plpy.execute("""
CREATE TABLE {out_table_summary} (
{grouping_cols_summary}
vertex_table TEXT,
vertex_id TEXT,
vertex_id_type TEXT
)
""".format(grouping_cols_summary='' if not grouping_cols else
'grouping_cols TEXT, ', **locals()))
vertex_id_type = get_expr_type(vertex_id, vertex_table)
plpy.execute("""
INSERT INTO {out_table_summary} VALUES
({grouping_cols_summary} '{vertex_table}', '{vertex_id}',
'{vertex_id_type}')
""".format(grouping_cols_summary='' if not grouping_cols else
"'{0}', ".format(grouping_cols), **locals()))
plpy.execute("DROP TABLE IF EXISTS {0},{1},{2},{3} ".
format(message, oldupdate, newupdate, toupdate))
# WCC Helper functions:
def extract_wcc_summary_cols(wcc_summary_table):
"""
WCC helper function to find all values stored in the summary table.
Args:
@param wcc_summary_table
Returns:
Dictionary, containing the column names and their values. The
keys in the dictionary are 'vertex_id', 'vertex_id_type' and
'grouoping_cols' if grouping cols exist.
"""
return plpy.execute("SELECT * FROM {wcc_summary_table} ".format(
**locals()))[0]
def preprocess_wcc_table_args(wcc_table, out_table):
"""
Validate wcc_table, wcc_table_summary and the output tables. Read
the summary table and return a dictionary of the summary table.
"""
validate_output_and_summary_tables(wcc_table, "WCC", out_table)
wcc_summary_table = add_postfix(wcc_table, "_summary")
return extract_wcc_summary_cols(wcc_summary_table)
def check_input_vertex_validity(wcc_args, vertices):
"""
Function to check if vertices are all valid, i.e., are present
in the WCC's original input vertex table. Even if one of the input
vertices (when more than one) is not valid, return False
Args:
@param wcc_args (dict)
@param vertices (list)
Returns:
True if all vertices in the list are present in the original input
vertex table, False otherwise.
"""
vertex_table = wcc_args['vertex_table']
_assert(table_exists(vertex_table),
"Graph WCC: Input vertex table '{0}' does not exist.".format(
vertex_table))
vertex_col = wcc_args['vertex_id']
where_clause = ' OR '.join(["{0}='{1}'".format(vertex_col, v)
for v in vertices])
count = plpy.execute("""
SELECT COUNT(*) as count FROM (
SELECT 1 FROM {vertex_table}
WHERE {where_clause}
) t
""".format(**locals()))[0]['count']
_assert(count == len(vertices),
"Graph WCC: Invalid input vertex in {0}.".format(str(vertices)))
def create_component_cnts_table(wcc_table, cnts_out_table,
grouping_cols_comma):
"""
WCC helper function to create a table containing the number of vertices
per component.
Args:
@param wcc_table
@param cnts_out_table
@param grouping_cols_comma
Returns:
Creates a new table called cnts_out_table with necessary content.
"""
plpy.execute("""
CREATE TABLE {cnts_out_table} AS
SELECT {grouping_cols_select} component_id, COUNT(*) as num_vertices
FROM {wcc_table}
GROUP BY {group_by_clause} component_id
""".format(grouping_cols_select=grouping_cols_comma,
group_by_clause=grouping_cols_comma, **locals()))
def graph_wcc_largest_cpt(schema_madlib, wcc_table, largest_cpt_table,
**kwargs):
"""
WCC helper function that computes the largest weakly connected component
in each group (if grouping cols are defined)
Args:
@param wcc_table
@param largest_cpt_table
Returns:
Creates table largest_cpt_table that contains a column called
component_id that refers to the largest component. If grouping_cols
are defined, columns corresponding to the grouping_cols are also
created, and the largest component is computed with regard to a group.
"""
with MinWarning("warning"):
wcc_args = preprocess_wcc_table_args(wcc_table, largest_cpt_table)
# Create temp table containing the number of vertices in each
# component.
tmp_cnt_table = unique_string(desp='tmpcnt')
if 'grouping_cols' in wcc_args:
grouping_cols = wcc_args['grouping_cols']
else:
grouping_cols = ''
glist = split_quoted_delimited_str(grouping_cols)
grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
subq = unique_string(desp='q')
subt = unique_string(desp='t')
create_component_cnts_table(wcc_table, tmp_cnt_table,
grouping_cols_comma)
# Query to find ALL largest components within groups.
select_grouping_cols_subq = ''
groupby_clause_subt = ''
grouping_cols_join = ''
if grouping_cols:
select_grouping_cols_subq = get_table_qualified_col_str(subq, glist) + ','
groupby_clause_subt = ' GROUP BY {0}'.format(grouping_cols)
grouping_cols_join = ' AND ' + _check_groups (subq, subt, glist)
plpy.execute("""
CREATE TABLE {largest_cpt_table} AS
SELECT {select_grouping_cols_subq} {subq}.component_id,
{subt}.maxcnt AS num_vertices
FROM {tmp_cnt_table} AS {subq}
INNER JOIN (
SELECT {grouping_cols_select_subt}
MAX(num_vertices) AS maxcnt
FROM {tmp_cnt_table}
{groupby_clause_subt}
) {subt}
ON {subq}.num_vertices={subt}.maxcnt
{grouping_cols_join}
""".format(grouping_cols_select_subt=grouping_cols_comma,
**locals()))
# Drop temp table
plpy.execute("DROP TABLE IF EXISTS {0}".format(tmp_cnt_table))
def graph_wcc_histogram(schema_madlib, wcc_table, histogram_table, **kwargs):
"""
Retrieve Histogram of Vertices Per Connected Component
Args:
@param wcc_table
@param histogram_table
Returns:
Creates and populates histogram_table with number of vertices per
component (represented by column num_vertices). Columns corresponding
to grouping_cols are also created if defined.
"""
with MinWarning("warning"):
wcc_args = preprocess_wcc_table_args(wcc_table, histogram_table)
grouping_cols_comma = ''
if 'grouping_cols' in wcc_args:
grouping_cols_comma = wcc_args['grouping_cols'] + ', '
create_component_cnts_table(wcc_table, histogram_table,
grouping_cols_comma)
def graph_wcc_vertex_check(schema_madlib, wcc_table, vertex_pair, pair_table,
**kwargs):
"""
WCC helper function to check if two vertices belong to the same component.
Args:
@param wcc_table
@param vertex_pair
@param pair_table
Returns:
Creates and populates pair_table with all the components that have
both the vertices specified in the vertex_pair attribute. There are
columns for grouping, if specified.
"""
with MinWarning("warning"):
wcc_args = preprocess_wcc_table_args(wcc_table, pair_table)
vertices = split_quoted_delimited_str(vertex_pair)
_assert(vertices and len(vertices) == 2,
"Graph WCC: Invalid vertex pair ({0}) input.".format(
vertex_pair))
check_input_vertex_validity(wcc_args, vertices)
grouping_cols_comma = ''
if 'grouping_cols' in wcc_args:
grouping_cols_comma = wcc_args['grouping_cols'] + ', '
subq = unique_string(desp='subq')
inner_select_clause = " SELECT {0} component_id ".format(
grouping_cols_comma)
inner_from_clause = " FROM {0} ".format(wcc_table)
inner_groupby_clause = " GROUP BY {0} component_id".format(
grouping_cols_comma)
plpy.execute("""
CREATE TABLE {pair_table} AS
SELECT {grouping_cols_comma} component_id
FROM (
{inner_select_clause}, 1
{inner_from_clause}
WHERE {vertex_id}='{vertex1}'
{inner_groupby_clause}
UNION ALL
{inner_select_clause}, 2
{inner_from_clause}
WHERE {vertex_id}='{vertex2}'
{inner_groupby_clause}
) {subq}
GROUP BY {grouping_cols_comma} component_id
HAVING COUNT(*)=2
""".format(vertex_id=wcc_args['vertex_id'],
vertex1=vertices[0], vertex2=vertices[1], **locals()))
def graph_wcc_reachable_vertices(schema_madlib, wcc_table, src,
reachable_vertices_table, **kwargs):
"""
WCC helper function to retrieve all vertices reachable from a vertex
Args:
@param wcc_table
@param src
@param reachable_vertices_table
Results:
Creates and populates reachable_vertices_table table with all the
vertices reachable from src vertex, where reachability is with
regard to a component. There are columns for grouping, if specified.
"""
with MinWarning("warning"):
wcc_args = preprocess_wcc_table_args(wcc_table,
reachable_vertices_table)
check_input_vertex_validity(wcc_args, split_quoted_delimited_str(src))
grouping_cols_comma = ''
grouping_cols = ''
if 'grouping_cols' in wcc_args:
grouping_cols = wcc_args['grouping_cols']
grouping_cols_comma = grouping_cols + ', '
vertex_id = wcc_args['vertex_id']
subq = unique_string(desp='subq')
glist = split_quoted_delimited_str(grouping_cols)
grouping_cols_join = '' if not grouping_cols else ' AND ' + \
_check_groups(wcc_table, subq, glist)
subq_grouping_cols = '' if not grouping_cols else \
get_table_qualified_col_str(subq, glist) + ', '
plpy.execute("""
CREATE TABLE {reachable_vertices_table} AS
SELECT {subq_grouping_cols} {subq}.component_id,
{wcc_table}.{vertex_id} AS dest
FROM {wcc_table}
INNER JOIN (
SELECT {grouping_cols_comma} component_id, {vertex_id}
FROM {wcc_table}
GROUP BY {vertex_id}, {grouping_cols_comma} component_id
HAVING {vertex_id}='{src}'
) {subq}
ON {wcc_table}.component_id={subq}.component_id
{grouping_cols_join}
WHERE {wcc_table}.{vertex_id} != '{src}'
""".format(**locals()))
def graph_wcc_num_cpts(schema_madlib, wcc_table, count_table, **kwargs):
"""
WCC helper function to count the number of connected components
Args:
@param: wcc_table
@param: count_table
Results:
Creates and populates the count_table table with the total number
of components. If grouping_cols is involved, number of components
are computed with regard to a group.
"""
with MinWarning("warning"):
wcc_args = preprocess_wcc_table_args(wcc_table, count_table)
grouping_cols = ''
grouping_cols_comma = ''
if 'grouping_cols' in wcc_args:
grouping_cols = wcc_args['grouping_cols']
grouping_cols_comma = grouping_cols + ', '
plpy.execute("""
CREATE TABLE {count_table} AS
SELECT {grouping_cols_comma}
COUNT(DISTINCT component_id) AS num_components
FROM {wcc_table}
{grp_by_clause}
""".format(grp_by_clause='' if not grouping_cols else
' GROUP BY {0}'.format(grouping_cols), **locals()))
def wcc_help(schema_madlib, message, **kwargs):
"""
Help function for wcc
Args:
@param schema_madlib
@param message: string, Help message string
@param kwargs
Returns:
String. Help/usage information
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
help_string = "Get from method below"
help_string = get_graph_usage(
schema_madlib,
'Weakly Connected Components',
"""out_table TEXT, -- Output table of weakly connected components
grouping_col TEXT -- Comma separated column names to group on
-- (DEFAULT = NULL, no grouping)
""") + """
Once the above function is used to obtain the out_table, it can be used to
call several other helper functions based on weakly connected components:
(1) To retrieve the largest connected component:
SELECT {schema_madlib}.graph_wcc_largest_cpt(
wcc_table TEXT, -- Name of the table that contains the WCC output.
largest_cpt_table TEXT -- Name of the output table that contains the
-- largest components details.
);
(2) To retrieve the histogram of vertices per connected component:
SELECT {schema_madlib}.graph_wcc_histogram(
wcc_table TEXT, -- Name of the table that contains the WCC output.
histogram_table TEXT -- Name of the output table that contains the
-- histogram of vertices per connected component.
);
(3) To check if two vertices belong to the same component:
SELECT {schema_madlib}.graph_wcc_vertex_check(
wcc_table TEXT, -- Name of the table that contains the WCC output.
vertex_pair TEXT, -- Pair of vertex IDs, separated by a comma.
pair_table TEXT -- Name of the output table that contains the all
-- components that contain the two vertices.
);
(4) To retrieve all vertices reachable from a vertex:
SELECT {schema_madlib}.graph_wcc_reachable_vertices(
wcc_table TEXT, -- Name of the table that contains the WCC output.
src TEXT, -- Initial source vertex.
reachable_vertices_table TEXT -- Name of the output table that
-- contains all vertices in a
-- component reachable from src.
);
(5) To count the number of connected components:
SELECT {schema_madlib}.graph_wcc_num_cpts(
wcc_table TEXT, -- Name of the table that contains the WCC output.
count_table TEXT -- Name of the output table that contains the count
-- of number of components.
);"""
else:
help_string = """
----------------------------------------------------------------------------
SUMMARY
----------------------------------------------------------------------------
Given a directed graph, a weakly connected component is a sub-graph of the
original graph where all vertices are connected to each other by some path,
ignoring the direction of edges. In case of an undirected graph, a weakly
connected component is also a strongly connected component.
--
For an overview on usage, run:
SELECT {schema_madlib}.weakly_connected_components('usage');
"""
return help_string.format(schema_madlib=schema_madlib)
# ---------------------------------------------------------------------