| import plpy |
| import math |
| |
| from utilities.validate_args import input_tbl_valid, output_tbl_valid |
| from utilities.validate_args import cols_in_tbl_valid |
| from utilities.utilities import py_list_to_sql_string |
| |
| |
| class Summarizer: |
| |
| def __init__(self, schema_madlib, source_table, output_table, |
| target_cols, grouping_cols, distinctify, get_quartiles, |
| xtileify='Exact', ntile_array=None, how_many_mfv=10, |
| get_mfv_quick=False): |
| self._schema_madlib = schema_madlib |
| self._source_table = source_table |
| self._output_table = output_table |
| self._grouping_cols = grouping_cols |
| self._target_cols = target_cols |
| self._distinctify = distinctify |
| self._get_quartiles = get_quartiles |
| self._xtileify = xtileify |
| self._ntile_array = ntile_array |
| self._how_many_mfv = how_many_mfv |
| self._get_mfv_quick = get_mfv_quick |
| self._columns = None |
| self._column_names = None |
| self._delimiter = '_.*.&.!.!.&.*_' |
| |
| def _populate_columns(self): |
| if self._target_cols: |
| lower_target_cols = [each_col.lower() for each_col in self._target_cols |
| if '"' not in each_col] |
| target_selection = "AND attname = ANY({0})".format( |
| py_list_to_sql_string(lower_target_cols, array_type="text")) |
| else: |
| target_selection = "" |
| self._columns = plpy.execute(""" |
| SELECT |
| quote_ident(attname) AS attname, |
| typname, |
| attnum |
| FROM |
| pg_attribute a |
| JOIN |
| pg_type t |
| ON (a.atttypid=t.oid) |
| WHERE attrelid = '{tbl}'::regclass |
| AND attnum > 0 |
| AND not attisdropped |
| {target_selection} |
| ORDER BY attnum |
| """.format(tbl=self._source_table, |
| target_selection=target_selection)) |
| self._column_names = [col['attname'] for col in self._columns] |
| |
| # ----------------------------------------------------------------------- |
| # Argument validation functions |
| # ----------------------------------------------------------------------- |
| def _validate_table_names(self): |
| """ |
| Validate the required arguments for the summary function |
| """ |
| # source table |
| if self._source_table is None or self._source_table.strip() == '': |
| plpy.error("Summary error: Invalid data table name!") |
| input_tbl_valid(self._source_table, "Summary") |
| |
| # output table |
| output_tbl_valid(self._output_table, "Summary") |
| |
| def _validate_required_cols(self, required_cols): |
| if required_cols is not None and required_cols != [None]: |
| clean_cols = [c for c in required_cols if c is not None] |
| cols_in_tbl_valid(self._source_table, clean_cols, "Summary") |
| |
| def _validate_ntile_array(self): |
| if self._ntile_array is not None: |
| if len(self._ntile_array) == 0: |
| plpy.error(""" |
| Summary -- Invalid parameter: ntile_array is empty.""") |
| for ntile in self._ntile_array: |
| if ntile < 0 or ntile > 1.0: |
| plpy.error(""" |
| Summary -- Invalid parameter: Values in ntile_array |
| should be in the range [0.0, 1.0]""") |
| |
| def _adjust_cols(self): |
| # if #cols == 1, then it should not appear in the grouping_cols |
| if len(self._column_names) == 1 and \ |
| self._column_names[0] in self._grouping_cols: |
| self._grouping_cols.remove(self._column_names[0]) |
| |
| def _validate_paras(self): |
| """ |
| Validate all parameters in the class |
| """ |
| self._validate_table_names() |
| self._validate_required_cols(self._target_cols) |
| self._validate_required_cols(self._grouping_cols) |
| self._validate_ntile_array() |
| if self._how_many_mfv is None or self._how_many_mfv <= 0: |
| plpy.error(""" |
| Summary -- Invalid parameter: Number of most frequent values |
| required should be positive""") |
| |
| |
| # ----- End of argument validation functions ----------------------------- |
| |
| def _build_subquery(self, group_var, cols): |
| """ |
| Returns a subquery of statistics for target columns for a single |
| grouping variable |
| """ |
| # ensure group_var does not suffer from bad input from user |
| group_var = group_var.lower() if group_var and '"' not in group_var else group_var |
| args = {'source_table': self._source_table} |
| # Exclude the grouping_cols variable from the list of columns to |
| # report statistics on |
| cols = filter(lambda x: x['attname'] != group_var, cols) |
| if group_var: |
| args['group_value'] = "{schema_madlib}.__to_char(%s)" % group_var |
| args['group_var'] = "'%s'" % group_var |
| args['group_expr'] = "\n GROUP BY %s" % group_var |
| else: |
| args['group_value'] = "NULL" |
| args['group_var'] = "NULL" |
| args['group_expr'] = "" |
| args['column_names'] = ','.join(["'%s'" % c['attname'] for c in cols]) |
| args['column_types'] = ','.join(["'%s'" % c['typname'] for c in cols]) |
| args['column_number'] = ','.join([str(c['attnum']) for c in cols]) |
| if self._distinctify is 'Estimated': |
| args['distinct_columns'] = ','.join(["{schema_madlib}.fmsketch_dcount(%s)" % c['attname'] for c in cols]) |
| elif self._distinctify is 'Exact': |
| args['distinct_columns'] = ','.join(["count(distinct %s)" % c['attname'] for c in cols]) |
| else: |
| args['distinct_columns'] = ','.join(["NULL" for c in cols]) |
| args['missing_columns'] = ','.join(["sum(case when %s is null then 1 else 0 end)" % ( |
| c['attname']) for c in cols]) |
| args['blank_columns'] = ','.join(["sum(case when {0} similar to E'\\\\W*' \ |
| then 1 else 0 end)".format(c['attname']) |
| if c['typname'] in ('varchar', 'bpchar', 'text', 'character varying') |
| else 'NULL' for c in cols]) |
| # ------ Helper sub-functions ------ |
| # types are obtained from pg_attribute (hence different from those |
| # obtained from information_schema.columns) |
| numeric_types = ('int2', 'int4', 'int8', 'float4', 'float8', 'numeric') |
| |
| def numeric_type(operator, datatype): |
| if datatype['typname'] in numeric_types: |
| return '%s(%s)' % (operator, datatype['attname']) |
| return "NULL" |
| |
| def minmax_type(minmax, c): |
| if c['typname'] in numeric_types: |
| return '%s(%s)' % (minmax, c['attname']) |
| if c['typname'] in ('varchar', 'bpchar', 'text'): |
| return "%s(length(%s))" % (minmax, c['attname']) |
| return "NULL" |
| |
| def xtile_type(xtile, c): |
| if self._xtileify is 'Exact': |
| if c['typname'] in numeric_types: |
| return "percentile_cont(%s) WITHIN GROUP (ORDER BY %s)" % (xtile, c['attname']) |
| if self._xtileify is 'Estimated': |
| return "NULL" # NOT YET IMPLEMENTED |
| return "NULL" |
| |
| def mfv_type(get_count, c): |
| slicing = ('0:0', '1:1')[get_count] |
| mfv_method = ('mfvsketch_top_histogram', |
| 'mfvsketch_quick_histogram')[self._get_mfv_quick] |
| return """ |
| array_to_string(({schema_madlib}.{mfv_method}( |
| {column},{topk}))[0:{topk}-1][{slice}], |
| '{delimiter}')""".format( |
| schema_madlib=self._schema_madlib, |
| mfv_method=mfv_method, |
| column=c['attname'], |
| topk=self._how_many_mfv, |
| slice=slicing, |
| delimiter=self._delimiter) |
| # ------ End of Helper sub-functions ------ |
| args['mean_columns'] = ','.join([numeric_type('avg', c) for c in cols]) |
| args['var_columns'] = ','.join([numeric_type('variance', c) for c in cols]) |
| args['min_columns'] = ','.join([minmax_type('min', c) for c in cols]) |
| |
| args['q1_columns'] = ','.join([xtile_type(0.25, c) |
| if self._get_quartiles |
| else 'NULL' for c in cols]) |
| args['q2_columns'] = ','.join([xtile_type(0.50, c) |
| if self._get_quartiles |
| else 'NULL' for c in cols]) |
| args['q3_columns'] = ','.join([xtile_type(0.75, c) |
| if self._get_quartiles |
| else 'NULL' for c in cols]) |
| |
| args['max_columns'] = ','.join([minmax_type('max', c) for c in cols]) |
| |
| args['ntile_columns'] = "array_to_string(array[NULL], ',')" |
| if self._ntile_array: |
| args['ntile_columns'] = ",".join([ |
| "array_to_string(array[" + |
| ",".join([xtile_type(xtile, c) for xtile in self._ntile_array]) + |
| "], ',')" for c in cols]) |
| args['mfv_value'] = ','.join([mfv_type(False, c) for c in cols]) |
| args['mfv_count'] = ','.join([mfv_type(True, c) for c in cols]) |
| subquery = """ |
| SELECT |
| {group_var}::text as group_by, |
| {group_value}::text as group_by_value, |
| array[{column_names}]::text[] as target_column, |
| array[{column_types}]::text[] as datatype, |
| array[{column_number}]::integer[] as colnum, |
| count(*)::bigint as rowcount, |
| array[{mean_columns}]::float8[] as mean, |
| array[{var_columns}]::float8[] as variance, |
| array[{distinct_columns}]::bigint[] as distinct_values, |
| array[{missing_columns}]::bigint[] as missing_values, |
| array[{blank_columns}]::bigint[] as blank_values, |
| array[{min_columns}]::float8[] as min, |
| array[{q1_columns}]::float8[] as first_quartile, |
| array[{q2_columns}]::float8[] as median, |
| array[{q3_columns}]::float8[] as third_quartile, |
| array[{ntile_columns}]::text[] as ntiles, |
| array[{max_columns}]::float8[] as max, |
| array[{mfv_value}]::text[] as mfv_value, |
| array[{mfv_count}]::text[] as mfv_count |
| FROM {source_table}{group_expr} |
| """.format(**args).format(schema_madlib=self._schema_madlib) |
| return subquery |
| |
| def _build_inner_query(self, group_val, cols): |
| subquery = self._build_subquery(group_val, cols) |
| query = """ |
| SELECT |
| group_by, |
| group_by_value, |
| target_column, |
| datatype, |
| colnum, |
| rowcount, |
| distinct_values, |
| missing_values, |
| blank_values, |
| distinct_values::float8 / rowcount as fraction_distinct_values, |
| missing_values::float8 / rowcount as fraction_missing_values, |
| blank_values::float8 / rowcount as fraction_blank_values, |
| mean, |
| variance, |
| min, |
| first_quartile, |
| median, |
| third_quartile, |
| ntiles, |
| max, |
| string_to_array(mfv_value, '{delimiter}') as mfv_value, |
| string_to_array(mfv_count, '{delimiter}') as mfv_count |
| FROM |
| ( |
| SELECT |
| group_by, |
| group_by_value, |
| unnest(target_column) AS target_column, |
| unnest(datatype) AS datatype, |
| unnest(colnum) AS colnum, |
| rowcount, |
| unnest(distinct_values) as distinct_values, |
| unnest(missing_values) as missing_values, |
| unnest(blank_values) as blank_values, |
| unnest(mean) as mean, |
| unnest(variance) as variance, |
| unnest(min) as min, |
| unnest(first_quartile) as first_quartile, |
| unnest(median) as median, |
| unnest(third_quartile) as third_quartile, |
| string_to_array(unnest(ntiles), ',') as ntiles, |
| unnest(max) as max, |
| unnest(mfv_value) as mfv_value, |
| unnest(mfv_count) as mfv_count |
| FROM ({subquery}) q1 |
| ) q2 |
| """.format(schema_madlib=self._schema_madlib, subquery=subquery, delimiter=self._delimiter) |
| return query |
| |
| def _build_query(self, group_val, cols, create_table): |
| query = self._build_inner_query(group_val, cols) |
| distinct_values = '' |
| if self._distinctify != 'Skip': |
| distinct_values = """ |
| (CASE WHEN rowcount < distinct_values THEN rowcount |
| ELSE distinct_values |
| END) AS distinct_values,""" |
| |
| first_quartile = '' |
| median = '' |
| third_quartile = '' |
| if self._get_quartiles and self._xtileify != 'Skip': |
| first_quartile = 'first_quartile,' |
| median = 'median,' |
| third_quartile = 'third_quartile,' |
| |
| ntiles = '' |
| if self._ntile_array and self._xtileify != 'Skip': |
| ntiles = 'ntiles::float8[] as quantile_array,' |
| |
| if create_table: |
| final_query = 'CREATE TABLE {output_table} AS ' |
| else: |
| final_query = 'INSERT INTO {output_table} ' |
| final_query += """ |
| SELECT |
| group_by as group_by, |
| group_by_value as group_by_value, |
| target_column as target_column, |
| colnum as column_number, |
| datatype as data_type, |
| rowcount as row_count, |
| {distinct_values} |
| missing_values as missing_values, |
| blank_values as blank_values, |
| (missing_values::float8 / rowcount) as fraction_missing, |
| (blank_values::float8 / rowcount) as fraction_blank, |
| mean, |
| variance, |
| min, |
| max, |
| {first_quartile} |
| {median} |
| {third_quartile} |
| {ntiles} |
| mfv_value as most_frequent_values, |
| mfv_count::bigint[] as mfv_frequencies |
| FROM |
| ( |
| {query} |
| |
| ) q3 |
| """ |
| if create_table: |
| final_query += """ |
| m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (group_by)')""" |
| |
| return final_query.format( |
| output_table=self._output_table, |
| query=query, |
| distinct_values=distinct_values, |
| first_quartile=first_quartile, |
| median=median, |
| third_quartile=third_quartile, |
| ntiles=ntiles) |
| |
| def run(self): |
| self._validate_paras() |
| self._populate_columns() |
| if not self._columns: |
| plpy.error("Summary error: Invalid column names {0} ".format(self._target_cols)) |
| self._adjust_cols() |
| try: |
| plpy.execute('DROP TABLE IF EXISTS {output_table}'.format( |
| output_table=self._output_table)) |
| create_table = True |
| except Exception: |
| plpy.error("Summary error: Invalid output table name " + self._output_table) |
| |
| # set a maximum number of columns to avoid out-of-memory |
| # issues when a lot of columns are computed concurrently |
| # We repeat the query multiple times till computation is complete for |
| # all columns. |
| actual_nCols = len(self._columns) |
| max_nCols = 15 |
| # ensuring an even spread of columns in each repeated attempt. For eg. |
| # if max_nCols = 15, to simulate 31 cols we break it down as [11, 11, 9] |
| # instead of [15, 15, 1]. This ensures low memory usage in each subquery |
| nSplits = math.ceil(float(actual_nCols) / max_nCols) |
| subset_nCols = int(math.ceil(actual_nCols / nSplits)) |
| subset_columns = [self._columns[pos: pos + subset_nCols] |
| for pos in range(0, actual_nCols, subset_nCols)] |
| for cols in subset_columns: |
| for group_val in self._grouping_cols: |
| # summary treats the comma-separated list of grouping_cols as |
| # "group by each" val in the list |
| plpy.execute(self._build_query(group_val, cols, create_table)) |
| create_table = False |