blob: 78bb1b94edd5c4e681f413ad2d99def8c144cef1 [file] [log] [blame]
#!/usr/bin/env python
"""@namespace profile
@file profile.py_in
@brief profile Table profile using Sketch Based Estimators
"""
import plpy
# ##
# List of functions to call for each column:
# - bas_num : basic numeric ...
# - adv_nonnum : all non-numeric
# Use '()' as the column placeholder.
# ##
aggs = {}
aggs['bas_num'] = [ "MIN()", "MAX()", "AVG()"
, "MADLIB_SCHEMA.cmsketch_median(MADLIB_SCHEMA.cmsketch(),count())"
]
aggs['all_num'] = [ "MIN()", "MAX()", "AVG()"
, "MADLIB_SCHEMA.cmsketch_median(MADLIB_SCHEMA.cmsketch(),count())"
, "MADLIB_SCHEMA.cmsketch_depth_histogram(MADLIB_SCHEMA.cmsketch(),#BUCKETS#)"
, "MADLIB_SCHEMA.cmsketch_width_histogram(MADLIB_SCHEMA.cmsketch(),MIN(),MAX(),#BUCKETS#)"
]
aggs['bas_nonnum'] = [ "MADLIB_SCHEMA.fmsketch_dcount()"]
aggs['all_nonnum'] = [ "MADLIB_SCHEMA.fmsketch_dcount()"
, "MADLIB_SCHEMA.array_collapse(MADLIB_SCHEMA.mfvsketch_quick_histogram((),#BUCKETS#))"
, "MADLIB_SCHEMA.array_collapse(MADLIB_SCHEMA.mfvsketch_top_histogram((),#BUCKETS#))"]
# ##
# @brief Main function that controls the execution
#
# @param madlib_schema Name of MADlib schema
# @param input_table Name of relation to run profile for
# @param funclist Type of agg list to use: basic or all
# @param buckets Number of buckets for histogram functions
# ##
def profile( madlib_schema, input_table, funclist, buckets):
# Validate parameter: input_table
if ( input_table.find('.') < 0):
# Does it exist? What schema(s) is it in?
plan = plpy.prepare( "SELECT 1 as order, table_schema as schema "
+ "FROM information_schema.columns WHERE table_name = $1"
+ "GROUP BY 1,2 UNION ALL SELECT 2 as order, NULL as schema ORDER BY 1"
, ['text'] )
rv = plpy.execute( plan, [input_table])
if (rv[0]['schema'] == None):
plpy.error( "input table/view does not exists (" + input_table + ")\n");
else:
schema_name = rv[0]['schema'];
table_name = input_table;
else:
schema_name, table_name = input_table.split('.');
# Does it exist?
plan = plpy.prepare( "SELECT count(*) as cnt FROM information_schema.columns WHERE "
+ "table_schema = $1 AND table_name = $2"
, ['text', 'text'] )
rv = plpy.execute( plan, [schema_name, table_name])
if (rv[0]['cnt'] == 0):
plpy.error( "input table/view does not exists (" + schema_name + '.' + table_name + ")\n");
# Prepare the lists of aggs
global aggs
for k in aggs.keys():
aggs[k] = [func.replace('MADLIB_SCHEMA', madlib_schema) for func in aggs[k]]
if buckets > 0:
aggs[k] = [func.replace('#BUCKETS#', str(buckets)) for func in aggs[k]]
# Get the lists of columns
(numcols, non_numcols) = __catalog_columns( schema_name, table_name)
# Build the query
rowset = __get_profile_data( schema_name, table_name, numcols, non_numcols, aggs, funclist, buckets)
return rowset
# ##
# @brief Helper routine to query the database catalog and get column names for
# table "schema_name.table_name".
#
# @param schema_name Name of the schema
# @param input_table Name of the relation to run profile for
# @return List of columns divided into 2 groups (integer and non-integer)
# ##
def __catalog_columns( schema_name, table_name):
# Fetch integer columnnames from table
cur = plpy.execute("""select column_name from information_schema.columns
where table_schema = '%s' and table_name = '%s'
and numeric_scale = 0
order by ordinal_position""" % (schema_name, table_name))
numcols = [c['column_name'] for c in cur]
# plpy.info('Numcols: ' + str(numcols))
# Fetch non-integer columnnames from table
cur = plpy.execute("""select column_name from information_schema.columns
where table_schema = '%s' and table_name = '%s'
and (numeric_scale is null or numeric_scale > 0)
order by ordinal_position""" % (schema_name, table_name))
non_numcols = [c['column_name'] for c in cur]
# plpy.info('Non-numcols: ' + str(non_numcols))
# Close communication with the database
return([numcols, non_numcols])
# ##
# @brief Builds the SQL query and runs it. Also builds the final rowset and
# populates it with data from the SQL results.
#
# @param schema Name of the schema
# @param table Name of relation to run profile for
# @param numcols List of numeric columns
# @param non_numcols List of non-numeric columns
# @param aggs List of agg functions to run
# @param funclist Type of agg list to use: basic or all
# @param buckets Number of buckets for histogram functions
# ##
def __get_profile_data( schema, table, numcols, non_numcols, aggs, funclist, buckets):
sql = 'SELECT count(*) AS "0"'
# Initialize the tuple dictonary
rowset = []
rowset.append( {'schema_name':schema, 'table_name':table, 'column_name': '*', 'function': 'COUNT()', 'id': 0, 'value': None} )
i = 0
# Numeric cols
for c in numcols:
for a in aggs[ funclist + '_num']:
i += 1;
sql += ', ' + a.replace('%%',c).replace('()','('+c+')') + ' AS "' + str(i) + '"'
rowset.append( { 'schema_name': schema
, 'table_name': table
, 'column_name': c
, 'function': a
, 'id': i
, 'value': None} )
# Non-Numeric cols
for c in non_numcols:
for a in aggs[ funclist + '_nonnum']:
i += 1;
sql += ', ' + a.replace('%%',c).replace('()','('+c+')') + ' AS "' + str(i) + '"'
rowset.append( { 'schema_name': schema
, 'table_name': table
, 'column_name': c
, 'function': a
, 'id': i
, 'value': None} )
sql += ' FROM ' + table + ';'
# Run the SQL
rv = plpy.execute( sql)
# Go through the results and fill in the values
for row in rowset:
row['value'] = rv[0][ str(row['id']) ]
return rowset