| # coding=utf-8 |
| |
| """ |
| @file control.py_in |
| |
| @brief controller classes (e.g. iteration controller) |
| |
| @namespace utilities |
| |
| @brief driver functions shared by modules |
| """ |
| |
| import plpy |
| |
| from utilities import __mad_version |
| version_wrapper = __mad_version() |
| from utilities import unique_string |
| _unique_string = unique_string |
| from control import MinWarning |
| |
| m4_changequote(`<!', `!>') |
| |
| |
| class IterationControllerComposite: |
| """ |
| @brief Abstraction for implementing driver functions in PL/Python |
| |
| This class encapsulates handling of the inter-iteration state. The design |
| goal is to avoid any conversion between backend-native types and those of |
| procedureal languages like PL/Python. Therefore, the expectation is that |
| |
| ***all only "template" parameters are passes as PL/Python arguments***, |
| |
| whereas non-template arguments are provided in an argument table. Here, |
| "template" arguments are those parameters that cannot be SQL parameters, |
| |
| ***such as table and column names***. |
| |
| The inter-state iteration table contains two columns: |
| - <tt>_iteration INTEGER</tt> - The 0-based iteration number |
| - <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after |
| iteration \c _interation) |
| """ |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables=True, |
| truncAfterIteration=False, |
| schema_madlib="MADLIB_SCHEMA_MISSING", |
| verbose=False, |
| **kwargs): |
| self.kwargs = kwargs |
| self.kwargs.update( |
| unqualified_rel_state=rel_state, |
| rel_args=('pg_temp.' if temporaryTables else '') + rel_args, |
| rel_state=('pg_temp.' if temporaryTables else '') + rel_state, |
| stateType=stateType.format(schema_madlib=schema_madlib), |
| schema_madlib=schema_madlib) |
| self.temporaryTables = temporaryTables |
| self.truncAfterIteration = truncAfterIteration |
| self.verbose = verbose |
| self.inWith = False |
| self.iteration = -1 |
| |
| def __enter__(self): |
| with MinWarning('warning'): |
| self.runSQL(""" |
| DROP TABLE IF EXISTS {rel_state}; |
| CREATE {temp} TABLE {unqualified_rel_state} ( |
| _iteration INTEGER PRIMARY KEY, |
| _state {stateType} |
| )m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!DISTRIBUTED BY (_iteration)!>); |
| """.format( |
| temp='TEMPORARY' if self.temporaryTables else '', |
| **self.kwargs)) |
| self.inWith = True |
| return self |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| |
| def runSQL(self, sql): |
| if self.verbose: |
| plpy.notice(sql) |
| return plpy.execute(sql) |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT * |
| FROM {{rel_state}} AS _state |
| WHERE _state._iteration = {{iteration}} |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, |
| **self.kwargs)) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |
| def test(self, condition): |
| """ |
| Test if the given condition is satisfied. The condition may depend on |
| the current inter-iteration state and all arguments |
| |
| @param condition Boolean SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c condition evaluates to NULL, otherwise the Boolean |
| value of \c condition |
| """ |
| return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition)) |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| updateKwargs.update(**self.kwargs) |
| newState = newState.format(iteration=self.iteration, **updateKwargs) |
| self.iteration = self.iteration + 1 |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format(iteration=self.iteration, newState=newState, |
| **self.kwargs)) |
| if self.truncAfterIteration: |
| self.runSQL(""" |
| DELETE FROM {rel_state} AS _state |
| WHERE _state._iteration < {iteration} |
| """.format(iteration=self.iteration, **self.kwargs)) |