| # coding=utf-8 |
| """ |
| @file control.py_in |
| |
| @brief controller classes (e.g. iteration controller) |
| |
| @namespace utilities |
| |
| @brief driver functions shared by modules |
| """ |
| |
| import plpy |
| from control import MinWarning |
| from utilities import __mad_version |
| from utilities import unique_string |
| _unique_string = unique_string |
| |
| version_wrapper = __mad_version() |
| mad_vec = version_wrapper.select_vecfunc() |
| |
| m4_changequote(`<!', `!>') |
| |
| |
| class GroupIterationController: |
| |
| """ |
| @brief Abstraction for implementing driver functions in PL/Python |
| |
| This class encapsulates handling of the inter-iteration state. The design |
| goal is to avoid any conversion between backend-native types and those of |
| procedureal languages like PL/Python. Therefore, the expectation is that |
| |
| *** all "template" parameters are passed as PL/Python arguments ***, |
| |
| whereas non-template arguments are provided in an argument table. Here, |
| "template" arguments are those parameters that cannot be SQL parameters, |
| |
| *** such as table and column names ***. |
| |
| This class assumes a transition state always has its status indicator |
| in the last element. 0 means in progress, 1 means completed, > 1 means |
| abnormal. Perhaps a C++ UDF should be added for extracting the status. |
| |
| The inter-state iteration table contains three columns: |
| - <tt>_grouping_cols</tt> - List of columns that are provided as grouping |
| arguments |
| - <tt>_iteration INTEGER</tt> - The 0-based iteration number |
| - <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after |
| iteration \c _interation) |
| """ |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables=True, |
| schema_madlib="MADLIB_SCHEMA_MISSING", |
| verbose=False, |
| grouping_str="NULL", |
| col_grp_iteration="_iteration", |
| col_grp_state="_state", |
| **kwargs): |
| self.temporaryTables = temporaryTables |
| self.verbose = verbose |
| self.inWith = False |
| self.iteration = -1 |
| self.grouping_str = grouping_str |
| self.kwargs = kwargs |
| self.kwargs.update( |
| unqualified_rel_state=rel_state, |
| rel_args=('pg_temp.' if temporaryTables else '') + rel_args, |
| rel_state=('pg_temp.' if temporaryTables else '') + rel_state, |
| stateType=stateType.format(schema_madlib=schema_madlib), |
| schema_madlib=schema_madlib, |
| grouping_str=self.grouping_str, |
| col_grp_null=_unique_string(), |
| col_grp_key=_unique_string(), |
| col_grp_iteration=col_grp_iteration, |
| col_grp_state=col_grp_state |
| ) |
| grouping_col = "Null" if kwargs["grouping_col"] is None else kwargs["grouping_col"] |
| using_str = "on True" if kwargs["grouping_col"] is None else "using ({grouping_col})".format(**kwargs) |
| self.is_group_null = True if kwargs["grouping_col"] is None else False |
| self.kwargs["grouping_col"] = grouping_col |
| self.kwargs["using_str"] = using_str |
| self.grouping_col = grouping_col |
| |
| def __enter__(self): |
| with MinWarning('warning'): |
| # currently assuming that groups is passed as a valid array |
| group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null |
| else "{grouping_col}").format(**self.kwargs) |
| groupby_str = ("{col_grp_null}" if self.is_group_null |
| else "{grouping_col}").format(**self.kwargs) |
| primary_str = "" if self.is_group_null else ", {grouping_col}".format(**self.kwargs) |
| |
| self.runSQL( |
| """ |
| drop table if exists {rel_state}; |
| create {temp} table {unqualified_rel_state} as ( |
| select |
| {group_col}, |
| 0::integer as {col_grp_iteration}, |
| Null::{stateType} as {col_grp_state} |
| from {rel_source} |
| group by {groupby_str} |
| ); |
| m4_ifdef(<!__POSTGRESQL__!>, <!!>, <! |
| alter table {rel_state} set distributed by ({col_grp_iteration} {primary_str}); |
| !>) |
| """.format(group_col=group_col, groupby_str=groupby_str, primary_str=primary_str, |
| temp='TEMPORARY' if self.temporaryTables else '', **self.kwargs)) |
| null_test = " or ".join([g.strip() + " is NULL" for g in |
| self.kwargs['grouping_col'].split(",")]) |
| null_count = plpy.execute( |
| """ |
| select count(*) from {rel_state} where {null_test} |
| """.format(null_test=null_test, **self.kwargs))[0]['count'] |
| if null_count != 0 and primary_str: |
| plpy.error("Grouping error: at least one of the grouping columns contains NULL values!" |
| " Please filter out those NULL values.") |
| self.runSQL("alter table {rel_state} add primary key " |
| "({col_grp_iteration} {primary_str})". |
| format(primary_str=primary_str, **self.kwargs)) |
| self.inWith = True |
| return self |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| |
| def runSQL(self, sql): |
| if self.verbose: |
| plpy.notice(sql) |
| return plpy.execute(sql) |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| resultObject = self.runSQL(""" |
| SELECT |
| ({expression}) AS _expression, |
| ARRAY[{{grouping_str}}] AS _groups |
| FROM {{rel_args}} AS _args |
| left outer join ( |
| ( |
| SELECT {{grouping_col}}, {col_grp_state} AS _state_previous |
| FROM {{rel_state}} |
| WHERE {col_grp_iteration} = {{iteration}} - 1 |
| ) sub1 |
| JOIN |
| ( |
| SELECT {{grouping_col}}, {col_grp_state} AS _state_current |
| FROM {{rel_state}} |
| WHERE {col_grp_iteration} = {{iteration}} |
| ) sub2 |
| {using_str} |
| ) AS subq1 ON True |
| """.format(expression=expression, **self.kwargs). |
| format(iteration=self.iteration, **self.kwargs)) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| complete_grps = [] |
| for each_elem in resultObject: |
| # update status for each group |
| group_vector = mad_vec(each_elem["_groups"]) |
| groups_as_str = [None] * len(group_vector) |
| # convert group values to string objects |
| for index, each_grp in enumerate(group_vector): |
| if not each_grp or each_grp.lower() == 'null': |
| # NULL values should be outputed as NULL instead of |
| # as a string 'NULL' |
| groups_as_str[index] = "NULL::text" |
| else: |
| groups_as_str[index] = "'" + str(each_grp) + "'::text" |
| array_str = "array[" + ",".join(groups_as_str) + "]" |
| # update status for the group if it completed iterating |
| if each_elem['_expression']: |
| self.runSQL(""" |
| UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1 |
| WHERE |
| ARRAY[{grouping_str}] = {_group_val} and |
| {col_grp_state}[array_upper({col_grp_state}, 1)] < 2 and |
| {col_grp_iteration} = {iteration} |
| """.format( |
| _group_val=array_str, |
| iteration=self.iteration, |
| **self.kwargs)) |
| |
| # return True only if all group combinations have finished iterating |
| rv = self.runSQL( |
| """ |
| select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst |
| from {rel_state} as _state_table |
| where _state_table.{col_grp_iteration} = {iteration} |
| """.format( |
| iteration=self.iteration, |
| **self.kwargs))[0]["rst"] |
| return rv |
| |
| def test(self, condition): |
| """ |
| Test if the given condition is satisfied. The condition may depend on |
| the current inter-iteration state and all arguments |
| |
| @param condition Boolean SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c condition evaluates to NULL, otherwise the Boolean |
| value of \c condition |
| """ |
| return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition)) |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| newState = newState.format(**self.kwargs) |
| self.iteration = self.iteration + 1 |
| groupby_str = "" if self.is_group_null \ |
| else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs) |
| groupby_str = "" if self.is_group_null \ |
| else "group by {grouping_col}".format(**self.kwargs) |
| self.runSQL( |
| """ |
| insert into {rel_state} |
| (select |
| {grouping_col}, |
| {iteration}, |
| ({newState}) |
| from |
| ({rel_source} AS _src |
| join |
| {rel_state} AS rel_state |
| {using_str}) |
| where |
| rel_state.{col_grp_iteration} = {iteration} - 1 and |
| (case when {iteration} = 1 then |
| True |
| else |
| rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0 |
| end) |
| {groupby_str}) |
| """.format( |
| groupby_str=groupby_str, |
| iteration=self.iteration, |
| newState=newState, |
| **self.kwargs)) |
| |
| m4_changequote(<!`!>, <!'!>) |