| # coding=utf-8 |
| m4_changequote(`<!', `!>') |
| |
| """ |
| @file control.py_in |
| |
| @brief controller classes (e.g. iteration controller) |
| |
| @namespace utilities |
| |
| @brief driver functions shared by modules |
| """ |
| |
| import plpy |
| |
| from utilities import __mad_version |
| version_wrapper = __mad_version() |
| from utilities import unique_string |
| _unique_string = unique_string |
| |
| |
| STATE_IN_MEM = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) |
| HAS_FUNCTION_PROPERTIES = m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>) |
| UDF_ON_SEGMENT_NOT_ALLOWED = m4_ifdef(<!__UDF_ON_SEGMENT_NOT_ALLOWED__!>, <!True!>, <!False!>) |
| |
| |
| class EnableOptimizer(object): |
| |
| """ |
| @brief: A wrapper that enables/disables the optimizer and |
| then sets it back to the original value on exit |
| """ |
| |
| def __init__(self, to_enable=True): |
| self.to_enable = to_enable |
| self.optimizer_enabled = False |
| # we depend on the fact that all GPDB/HAWQ versions that have the |
| # optimizer also define function properties |
| self.guc_exists = True if HAS_FUNCTION_PROPERTIES else False |
| |
| def __enter__(self): |
| # we depend on the fact that all GPDB/HAWQ versions that have the ORCA |
| # optimizer also define function properties |
| if self.guc_exists: |
| optimizer = plpy.execute("show optimizer")[0]["optimizer"] |
| self.optimizer_enabled = True if optimizer == 'on' else False |
| plpy.execute("set optimizer={0}".format(('off', 'on')[self.to_enable])) |
| |
| def __exit__(self, *args): |
| if args and args[0]: |
| # an exception was raised in code. We return False so that any |
| # exception is re-raised after exit. The transaction will not |
| # commit leading to reset of client_min_messages. |
| return False |
| else: |
| if self.guc_exists: |
| plpy.execute("set optimizer={0}". |
| format(('off', 'on')[self.optimizer_enabled])) |
| |
| |
| class EnableHashagg(object): |
| |
| """ |
| @brief: A wrapper that enables/disables the hashagg and then sets it back |
| to the original value on exit |
| """ |
| |
| def __init__(self, to_enable=True): |
| self.to_enable = to_enable |
| self.hashagg_enabled = False |
| self.guc_exists = True |
| |
| def __enter__(self): |
| try: |
| enable_hashagg = plpy.execute("show enable_hashagg")[0]["enable_hashagg"] |
| self.hashagg_enabled = True if enable_hashagg == 'on' else False |
| plpy.execute("set enable_hashagg={0}". |
| format(('off', 'on')[self.to_enable])) |
| except: |
| self.guc_exists = False |
| finally: |
| return self |
| |
| def __exit__(self, *args): |
| if args and args[0]: |
| # an exception was raised in code. We return False so that any |
| # exception is re-raised after exit. The transaction will not |
| # commit leading to reset of client_min_messages. |
| return False |
| else: |
| if self.guc_exists: |
| plpy.execute("set enable_hashagg={0}". |
| format(('off', 'on')[self.hashagg_enabled])) |
| |
| |
| class MinWarning: |
| |
| """ |
| @brief A wrapper for setting the level of logs going into client |
| """ |
| |
| def __init__(self, warningLevel='error'): |
| self.warningLevel = warningLevel |
| |
| def __enter__(self): |
| self.oldMsgLevel = plpy.execute(""" |
| SELECT setting FROM pg_settings WHERE name='client_min_messages' |
| """)[0]['setting'] |
| plpy.execute("SET client_min_messages = {warningLevel}". |
| format(warningLevel=self.warningLevel)) |
| return self |
| |
| def __exit__(self, *args): |
| if args and args[0]: |
| # an exception was raised in code. We return False so that any |
| # exception is re-raised after exit. The transaction will not |
| # commit leading to reset of client_min_messages. |
| return False |
| else: |
| # if no exception then we reset the client_min_messages |
| plpy.execute("SET client_min_messages = {oldMsgLevel}; ". |
| format(oldMsgLevel=self.oldMsgLevel)) |
| |
| |
| class IterationController: |
| |
| """ |
| @brief Abstraction for implementing driver functions in PL/Python |
| |
| This class encapsulates handling of the inter-iteration state. The design |
| goal is to avoid any conversion between backend-native types and those of |
| procedureal languages like PL/Python. Therefore, the expectation is that |
| |
| ***all only "template" parameters are passes as PL/Python arguments***, |
| |
| whereas non-template arguments are provided in an argument table. Here, |
| "template" arguments are those parameters that cannot be SQL parameters, |
| |
| ***such as table and column names***. |
| |
| The inter-state iteration table contains two columns: |
| - <tt>_iteration INTEGER</tt> - The 0-based iteration number |
| - <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after |
| iteration \c _interation) |
| """ |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables=True, |
| truncAfterIteration=False, |
| schema_madlib="MADLIB_SCHEMA_MISSING", |
| verbose=False, |
| initialize_state=False, |
| **kwargs): |
| self.kwargs = kwargs |
| self.kwargs.update( |
| unqualified_rel_state=rel_state, |
| rel_args=('pg_temp.' if temporaryTables else '') + rel_args, |
| rel_state=('pg_temp.' if temporaryTables else '') + rel_state, |
| stateType=stateType.format(schema_madlib=schema_madlib), |
| schema_madlib=schema_madlib) |
| self.temporaryTables = temporaryTables |
| self.truncAfterIteration = truncAfterIteration |
| self.verbose = verbose |
| self.inWith = False |
| self.iteration = -1 |
| self.initialize_state = initialize_state |
| if STATE_IN_MEM: |
| self.new_state={"_iteration": 0, "_state": None} |
| self.old_state={"_iteration": 0, "_state": None} |
| |
| def __enter__(self): |
| with MinWarning('warning'): |
| self.runSQL(""" |
| DROP TABLE IF EXISTS {rel_state}; |
| CREATE {temp} TABLE {unqualified_rel_state} ( |
| _iteration INTEGER m4_ifdef(<!__HAWQ__!>, <!!>, <!PRIMARY KEY!>), |
| _state {stateType} |
| )m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!DISTRIBUTED BY (_iteration)!>); |
| """.format( |
| temp='TEMPORARY' if self.temporaryTables else '', |
| **self.kwargs)) |
| if self.initialize_state: |
| self.runSQL(""" |
| INSERT INTO {rel_state} VALUES (0, NULL) |
| """.format(**self.kwargs)) |
| self.inWith = True |
| return self |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| if STATE_IN_MEM: |
| insert_plan = plpy.prepare(""" |
| INSERT INTO {rel_state} |
| SELECT $1, $2 |
| """.format(**self.kwargs), ["INTEGER", "DOUBLE PRECISION[]"]) |
| plpy.execute(insert_plan, |
| [self.new_state['_iteration'], self.new_state['_state']]) |
| |
| def runSQL(self, sql): |
| if self.verbose: |
| plpy.notice(sql) |
| return plpy.execute(sql) |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| |
| # For GPDB 4.3 we disable the optimizer (ORCA) for the query planner |
| # since currently ORCA has a bug for left outer joins (MPP-21868). |
| # This should be removed when the issue is fixed in ORCA. |
| with EnableOptimizer(False): |
| if STATE_IN_MEM: |
| eval_plan = plpy.prepare(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT $1 AS _state |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, **self.kwargs), |
| ["DOUBLE PRECISION[]"]) |
| resultObject = plpy.execute(eval_plan, |
| [[] if self.new_state['_state'] is None |
| else self.new_state['_state']]) |
| else: |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT _state |
| FROM {{rel_state}} AS _state |
| WHERE _state._iteration = {{iteration}} |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, **self.kwargs)) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |
| def test(self, condition): |
| """ |
| Test if the given condition is satisfied. The condition may depend on |
| the current inter-iteration state and all arguments |
| |
| @param condition Boolean SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c condition evaluates to NULL, otherwise the Boolean |
| value of \c condition |
| """ |
| return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition)) |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| updateKwargs.update(**self.kwargs) |
| newState = newState.format(iteration=self.iteration, **updateKwargs) |
| self.iteration = self.iteration + 1 |
| |
| if STATE_IN_MEM: |
| self.old_state = self.new_state |
| update_plan = plpy.prepare(""" |
| SELECT {0} AS _iteration, ({1}) AS _state |
| """.format(self.iteration, newState).format(__state__='$1'), |
| ["DOUBLE PRECISION[]"]) |
| self.new_state = plpy.execute(update_plan, |
| [None if self.new_state['_state'] is None |
| else self.new_state['_state']])[0] |
| else: |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format(iteration=self.iteration, |
| newState=newState, |
| **self.kwargs)) |
| if self.truncAfterIteration: |
| self.runSQL(""" |
| DELETE FROM {rel_state} AS _state |
| WHERE _state._iteration < {iteration} |
| """.format(iteration=self.iteration, **self.kwargs)) |
| |
| |
| class IterationController2D(IterationController): |
| |
| """ |
| @brief In-memory Iteration for 2-D array states |
| """ |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| if STATE_IN_MEM: |
| insert_plan = plpy.prepare(""" |
| INSERT INTO {rel_state} |
| SELECT $1, {schema_madlib}.array_to_2d($2) |
| """.format(**self.kwargs), |
| ["INTEGER", "DOUBLE PRECISION[]"]) |
| plpy.execute(insert_plan, |
| [self.new_state['_iteration'], self.new_state['_state']]) |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| # We disable the optimizer (ORCA) for the query planning |
| # since ORCA has a bug for left outer joins (MPP-21868). |
| # This should be removed when the issue is fixed in ORCA. |
| with EnableOptimizer(False): |
| if STATE_IN_MEM: |
| eval_plan = plpy.prepare(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT {{schema_madlib}}.array_to_2d($1) AS _state |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(**self.kwargs), ["DOUBLE PRECISION[]"]) |
| |
| resultObject = plpy.execute( |
| eval_plan, |
| [[] if self.new_state['_state'] is None else self.new_state['_state']]) |
| else: |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT * |
| FROM {{rel_state}} AS _state |
| WHERE _state._iteration = {{iteration}} |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, **self.kwargs)) |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| updateKwargs.update(**self.kwargs) |
| newState = newState.format( |
| iteration=self.iteration, |
| **updateKwargs) |
| self.iteration = self.iteration + 1 |
| |
| if STATE_IN_MEM: |
| self.old_state = self.new_state |
| update_plan = plpy.prepare(""" |
| SELECT |
| {iteration} AS _iteration, |
| {schema_madlib}.array_to_1d(({newState})) AS _state |
| """.format(iteration=self.iteration, |
| newState=newState, **self.kwargs), |
| ["DOUBLE PRECISION[]"]) |
| self.new_state = plpy.execute(update_plan, |
| [None if self.new_state['_state'] is None |
| else self.new_state['_state']])[0] |
| else: |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format(iteration=self.iteration, newState=newState, **self.kwargs)) |
| if self.truncAfterIteration: |
| self.runSQL(""" |
| DELETE FROM {rel_state} AS _state |
| WHERE _state._iteration < {iteration} |
| """.format(iteration=self.iteration, **self.kwargs)) |
| |
| |
| class IterationController2S(IterationController): |
| |
| """ |
| @brief Designed for the case where the state type is 1-D double array and |
| both the old state and new state are required |
| """ |
| |
| def evaluate(self, expression): |
| if STATE_IN_MEM: |
| eval_plan = plpy.prepare(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT |
| $1 AS _state_previous, |
| $2 AS _state_current |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, **self.kwargs), |
| ["DOUBLE PRECISION[]", "DOUBLE PRECISION[]"]) |
| |
| resultObject = plpy.execute( |
| eval_plan, |
| [self.old_state['_state'], self.new_state['_state']]) |
| else: |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT |
| _state_previous, _state_current |
| FROM |
| ( |
| SELECT _state AS _state_previous |
| FROM {{rel_state}} |
| WHERE _iteration = {{iteration}} - 1 |
| ) sub1, |
| ( |
| SELECT _state AS _state_current |
| FROM {{rel_state}} |
| WHERE _iteration = {{iteration}} |
| ) sub2 |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, |
| **self.kwargs)) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |
| if STATE_IN_MEM: |
| def get_state_size(self): |
| return len(self.new_state) |
| |
| def get_state_value(self, index): |
| return self.new_state['_state'][index] |
| |
| m4_changequote(<!`!> , <!'!>) |