| # coding=utf-8 |
| m4_changequote(`<!', `!>') |
| |
| """ |
| @file control.py_in |
| |
| @brief controller classes (e.g. iteration controller) |
| |
| @namespace utilities |
| |
| @brief driver functions shared by modules |
| """ |
| |
| import plpy |
| |
| from utilities import __mad_version |
| version_wrapper = __mad_version() |
| from utilities import unique_string |
| _unique_string = unique_string |
| |
| |
| class MinWarning: |
| """ |
| @brief A wrapper for setting the level of logs going into client |
| """ |
| |
| def __init__(self, warningLevel='error'): |
| self.warningLevel = warningLevel |
| |
| def __enter__(self): |
| self.oldMsgLevel = plpy.execute(""" |
| SELECT setting FROM pg_settings WHERE name='client_min_messages' |
| """)[0]['setting'] |
| plpy.execute(""" |
| SET client_min_messages = {warningLevel} |
| """.format(warningLevel=self.warningLevel)) |
| return self |
| |
| def __exit__(self, *args): |
| plpy.execute(""" |
| SET client_min_messages = {oldMsgLevel}; |
| """.format(oldMsgLevel=self.oldMsgLevel)) |
| |
| |
| class IterationControllerComposite: |
| """ |
| @brief Abstraction for implementing driver functions in PL/Python |
| |
| This class encapsulates handling of the inter-iteration state. The design |
| goal is to avoid any conversion between backend-native types and those of |
| procedureal languages like PL/Python. Therefore, the expectation is that |
| |
| ***all only "template" parameters are passes as PL/Python arguments***, |
| |
| whereas non-template arguments are provided in an argument table. Here, |
| "template" arguments are those parameters that cannot be SQL parameters, |
| |
| ***such as table and column names***. |
| |
| The inter-state iteration table contains two columns: |
| - <tt>_iteration INTEGER</tt> - The 0-based iteration number |
| - <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after |
| iteration \c _interation) |
| """ |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables=True, |
| truncAfterIteration=False, |
| schema_madlib="MADLIB_SCHEMA_MISSING", |
| verbose=False, |
| **kwargs): |
| self.kwargs = kwargs |
| self.kwargs.update( |
| unqualified_rel_state=rel_state, |
| rel_args=('pg_temp.' if temporaryTables else '') + rel_args, |
| rel_state=('pg_temp.' if temporaryTables else '') + rel_state, |
| stateType=stateType.format(schema_madlib=schema_madlib), |
| schema_madlib = schema_madlib) |
| self.temporaryTables = temporaryTables |
| self.truncAfterIteration = truncAfterIteration |
| self.verbose = verbose |
| self.inWith = False |
| self.iteration = -1 |
| m4_ifdef(<!__HAWQ__!>, <! |
| self.new_state = None |
| self.old_state = None |
| !>) |
| |
| def __enter__(self): |
| with MinWarning('warning'): |
| self.runSQL(""" |
| DROP TABLE IF EXISTS {rel_state}; |
| CREATE {temp} TABLE {unqualified_rel_state} ( |
| _iteration INTEGER m4_ifdef(<!__HAWQ__!>, <!!>, <!PRIMARY KEY!>), |
| _state {stateType} |
| )m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!DISTRIBUTED BY (_iteration)!>); |
| """.format( |
| temp='TEMPORARY' if self.temporaryTables else '', |
| **self.kwargs)) |
| self.inWith = True |
| return self |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| m4_ifdef(<!__HAWQ__!>, <! |
| insert_plan=plpy.prepare(""" |
| INSERT INTO {rel_state} |
| SELECT $1, CAST( ({schema_madlib}.array_to_2d($2), $3, $4, $5) AS {schema_madlib}.kmeans_state) |
| """.format(**self.kwargs), [ "INTEGER", "DOUBLE PRECISION[]", |
| "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION" ]) |
| plpy.execute(insert_plan, [ self.new_state['_iteration'], |
| self.new_state['centroids'], |
| self.new_state['old_centroid_ids'], |
| self.new_state['objective_fn'], |
| self.new_state['frac_reassigned'] ]) |
| !>) |
| |
| def runSQL(self, sql): |
| if self.verbose: |
| plpy.notice(sql) |
| return plpy.execute(sql) |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| |
| ## For GPDB 4.3 we disable the optimzer (ORCA) for the query planner |
| ## since currently ORCA has a bug for left outer joins (MPP-21868). |
| ## This should be removed when the issue is fixed in ORCA. |
| if version_wrapper.is_gp43() or version_wrapper.is_hawq(): |
| optimizer = plpy.execute("SHOW optimizer")[0]['optimizer'] |
| plpy.execute("SET optimizer = off") |
| |
| m4_ifdef(<!__HAWQ__!>, <! |
| cast_str = "CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4) AS {schema_madlib}.kmeans_state)".format(**self.kwargs) |
| cast_type = [ "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION" ] |
| cast_para = [None if self.new_state is None else self.new_state[i] |
| for i in ('centroids', 'old_centroid_ids', |
| 'objective_fn', 'frac_reassigned')] |
| eval_plan = plpy.prepare(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT {{schema_madlib}}.array_to_2d($1) AS _state |
| ) AS _state ON True |
| """.format(expression = expression).format( |
| iteration = self.iteration, |
| curr_state=cast_str, **self.kwargs), cast_type) |
| |
| resultObject = plpy.execute(eval_plan, cast_para) |
| !>, <! |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT * |
| FROM {{rel_state}} AS _state |
| WHERE _state._iteration = {{iteration}} |
| ) AS _state ON True |
| """.format(expression = expression).format( |
| iteration = self.iteration, |
| **self.kwargs)) |
| !>) |
| |
| if version_wrapper.is_gp43() or version_wrapper.is_hawq(): |
| plpy.execute("SET optimizer = " + optimizer) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |
| def test(self, condition): |
| """ |
| Test if the given condition is satisfied. The condition may depend on |
| the current inter-iteration state and all arguments |
| |
| @param condition Boolean SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c condition evaluates to NULL, otherwise the Boolean |
| value of \c condition |
| """ |
| |
| return self.evaluate(""" |
| CAST(({condition}) AS BOOLEAN) |
| """.format(condition = condition)) |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| updateKwargs.update(**self.kwargs) |
| newState = newState.format( |
| iteration = self.iteration, |
| **updateKwargs) |
| self.iteration = self.iteration + 1 |
| |
| m4_ifdef(<!__HAWQ__!>, <! |
| cast_str = """CAST (({schema_madlib}.array_to_2d($1), $2, $3, $4) |
| AS {schema_madlib}.kmeans_state)""".format(**self.kwargs) |
| cast_str_old = """CAST (({schema_madlib}.array_to_2d($5), $6, $7, $8) |
| AS {schema_madlib}.kmeans_state)""".format(**self.kwargs) |
| cast_type = [ |
| "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION", |
| "DOUBLE PRECISION[]", "INTEGER[]", "DOUBLE PRECISION", "DOUBLE PRECISION" |
| ] |
| |
| cast_para = [None if self.new_state is None else self.new_state[i] |
| for i in ('centroids', 'old_centroid_ids', 'objective_fn', |
| 'frac_reassigned')] |
| cast_para.extend([None if self.old_state is None else self.old_state[i] |
| for i in ('centroids', 'old_centroid_ids', 'objective_fn', |
| 'frac_reassigned')]) |
| |
| updateKwargs.update(curr_state=cast_str, old_state=cast_str_old) |
| self.old_state = self.new_state |
| |
| update_plan = plpy.prepare(""" |
| SELECT |
| {iteration} AS _iteration, |
| {schema_madlib}.array_to_1d((_state).centroids) AS centroids, |
| (_state).old_centroid_ids, |
| (_state).objective_fn, |
| (_state).frac_reassigned |
| FROM |
| ( |
| SELECT ({newState}) AS _state |
| ) q |
| """.format(iteration=self.iteration, |
| newState=newState, **self.kwargs).format(**updateKwargs), |
| cast_type) |
| self.new_state = plpy.execute(update_plan, cast_para)[0] |
| !>, <! |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format( |
| iteration = self.iteration, |
| newState = newState, |
| **self.kwargs)) |
| !>) |
| |
| m4_ifdef(<!__HAWQ__!>, <!!>, <! |
| if self.truncAfterIteration: |
| self.runSQL(""" |
| DELETE FROM {rel_state} AS _state |
| WHERE _state._iteration < {iteration} |
| """.format(iteration = self.iteration, **self.kwargs)) |
| !>) |
| |
| m4_changequote(<!`!>, <!'!>) |