| #!/usr/bin/env python |
| |
| """@namespace profile |
| |
| @file profile.py_in |
| @brief profile Table profile using Sketch Based Estimators |
| """ |
| |
| import plpy |
| |
| # ## |
| # List of functions to call for each column: |
| # - bas_num : basic numeric ... |
| # - adv_nonnum : all non-numeric |
| # Use '()' as the column placeholder. |
| # ## |
| aggs = {} |
| aggs['bas_num'] = [ "MIN()", "MAX()", "AVG()" |
| , "MADLIB_SCHEMA.cmsketch_median(MADLIB_SCHEMA.cmsketch(),count())" |
| ] |
| aggs['all_num'] = [ "MIN()", "MAX()", "AVG()" |
| , "MADLIB_SCHEMA.cmsketch_median(MADLIB_SCHEMA.cmsketch(),count())" |
| , "MADLIB_SCHEMA.cmsketch_depth_histogram(MADLIB_SCHEMA.cmsketch(),#BUCKETS#)" |
| , "MADLIB_SCHEMA.cmsketch_width_histogram(MADLIB_SCHEMA.cmsketch(),MIN(),MAX(),#BUCKETS#)" |
| ] |
| aggs['bas_nonnum'] = [ "MADLIB_SCHEMA.fmsketch_dcount()"] |
| aggs['all_nonnum'] = [ "MADLIB_SCHEMA.fmsketch_dcount()" |
| , "MADLIB_SCHEMA.array_collapse(MADLIB_SCHEMA.mfvsketch_quick_histogram((),#BUCKETS#))" |
| , "MADLIB_SCHEMA.array_collapse(MADLIB_SCHEMA.mfvsketch_top_histogram((),#BUCKETS#))"] |
| |
| |
| # ## |
| # @brief Main function that controls the execution |
| # |
| # @param madlib_schema Name of MADlib schema |
| # @param input_table Name of relation to run profile for |
| # @param funclist Type of agg list to use: basic or all |
| # @param buckets Number of buckets for histogram functions |
| # ## |
| def profile( madlib_schema, input_table, funclist, buckets): |
| |
| # Validate parameter: input_table |
| if ( input_table.find('.') < 0): |
| # Does it exist? What schema(s) is it in? |
| plan = plpy.prepare( "SELECT 1 as order, table_schema as schema " |
| + "FROM information_schema.columns WHERE table_name = $1" |
| + "GROUP BY 1,2 UNION ALL SELECT 2 as order, NULL as schema ORDER BY 1" |
| , ['text'] ) |
| rv = plpy.execute( plan, [input_table]) |
| if (rv[0]['schema'] == None): |
| plpy.error( "input table/view does not exists (" + input_table + ")\n"); |
| else: |
| schema_name = rv[0]['schema']; |
| table_name = input_table; |
| else: |
| schema_name, table_name = input_table.split('.'); |
| # Does it exist? |
| plan = plpy.prepare( "SELECT count(*) as cnt FROM information_schema.columns WHERE " |
| + "table_schema = $1 AND table_name = $2" |
| , ['text', 'text'] ) |
| rv = plpy.execute( plan, [schema_name, table_name]) |
| if (rv[0]['cnt'] == 0): |
| plpy.error( "input table/view does not exists (" + schema_name + '.' + table_name + ")\n"); |
| |
| # Prepare the lists of aggs |
| global aggs |
| for k in aggs.keys(): |
| aggs[k] = [func.replace('MADLIB_SCHEMA', madlib_schema) for func in aggs[k]] |
| if buckets > 0: |
| aggs[k] = [func.replace('#BUCKETS#', str(buckets)) for func in aggs[k]] |
| |
| # Get the lists of columns |
| (numcols, non_numcols) = __catalog_columns( schema_name, table_name) |
| |
| # Build the query |
| rowset = __get_profile_data( schema_name, table_name, numcols, non_numcols, aggs, funclist, buckets) |
| |
| return rowset |
| |
| # ## |
| # @brief Helper routine to query the database catalog and get column names for |
| # table "schema_name.table_name". |
| # |
| # @param schema_name Name of the schema |
| # @param input_table Name of the relation to run profile for |
| # @return List of columns divided into 2 groups (integer and non-integer) |
| # ## |
| def __catalog_columns( schema_name, table_name): |
| |
| # Fetch integer columnnames from table |
| cur = plpy.execute("""select column_name from information_schema.columns |
| where table_schema = '%s' and table_name = '%s' |
| and numeric_scale = 0 |
| order by ordinal_position""" % (schema_name, table_name)) |
| numcols = [c['column_name'] for c in cur] |
| # plpy.info('Numcols: ' + str(numcols)) |
| |
| # Fetch non-integer columnnames from table |
| cur = plpy.execute("""select column_name from information_schema.columns |
| where table_schema = '%s' and table_name = '%s' |
| and (numeric_scale is null or numeric_scale > 0) |
| order by ordinal_position""" % (schema_name, table_name)) |
| non_numcols = [c['column_name'] for c in cur] |
| # plpy.info('Non-numcols: ' + str(non_numcols)) |
| |
| # Close communication with the database |
| return([numcols, non_numcols]) |
| |
| # ## |
| # @brief Builds the SQL query and runs it. Also builds the final rowset and |
| # populates it with data from the SQL results. |
| # |
| # @param schema Name of the schema |
| # @param table Name of relation to run profile for |
| # @param numcols List of numeric columns |
| # @param non_numcols List of non-numeric columns |
| # @param aggs List of agg functions to run |
| # @param funclist Type of agg list to use: basic or all |
| # @param buckets Number of buckets for histogram functions |
| # ## |
| def __get_profile_data( schema, table, numcols, non_numcols, aggs, funclist, buckets): |
| |
| sql = 'SELECT count(*) AS "0"' |
| |
| # Initialize the tuple dictonary |
| rowset = [] |
| rowset.append( {'schema_name':schema, 'table_name':table, 'column_name': '*', 'function': 'COUNT()', 'id': 0, 'value': None} ) |
| |
| i = 0 |
| # Numeric cols |
| for c in numcols: |
| for a in aggs[ funclist + '_num']: |
| i += 1; |
| sql += ', ' + a.replace('%%',c).replace('()','('+c+')') + ' AS "' + str(i) + '"' |
| rowset.append( { 'schema_name': schema |
| , 'table_name': table |
| , 'column_name': c |
| , 'function': a |
| , 'id': i |
| , 'value': None} ) |
| |
| # Non-Numeric cols |
| for c in non_numcols: |
| for a in aggs[ funclist + '_nonnum']: |
| i += 1; |
| sql += ', ' + a.replace('%%',c).replace('()','('+c+')') + ' AS "' + str(i) + '"' |
| rowset.append( { 'schema_name': schema |
| , 'table_name': table |
| , 'column_name': c |
| , 'function': a |
| , 'id': i |
| , 'value': None} ) |
| |
| sql += ' FROM ' + table + ';' |
| |
| # Run the SQL |
| rv = plpy.execute( sql) |
| |
| # Go through the results and fill in the values |
| for row in rowset: |
| row['value'] = rv[0][ str(row['id']) ] |
| |
| return rowset |