| # coding=utf-8 |
| |
| """ |
| @file control.py_in |
| |
| @brief controller classes (e.g. iteration controller) |
| |
| @namespace utilities |
| |
| @brief driver functions shared by modules |
| """ |
| import plpy |
| |
| from distutils.util import strtobool |
| from functools import wraps |
| |
| from utilities import extract_keyvalue_params |
| from utilities import unique_string |
| |
| HAS_FUNCTION_PROPERTIES = m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `True', `False') |
| |
| |
| # from https://coderwall.com/p/0lk6jg/python-decorators-vs-context-managers-have-your-cake-and-eat-it |
| class ContextDecorator(object): |
| """ Class to use a context manager also as a decorator |
| |
| Inherit context manager classes from this class to use as a decorator |
| """ |
| def __init__(self, **kwargs): |
| self.__dict__.update(kwargs) |
| |
| def __enter__(self): |
| # Note: Returning self means that in "with ... as x", x will be self |
| return self |
| |
| def __exit__(self, typ, val, traceback): |
| pass |
| |
| def __call__(self, f): |
| @wraps(f) |
| def wrapper(*args, **kw): |
| with self: |
| return f(*args, **kw) |
| return wrapper |
| |
| |
| class SetGUC(ContextDecorator): |
| """ |
| @brief: A wrapper that sets/unsets GUCs and then sets it |
| back to the original value on exit |
| |
| This context manager sets the specified GUC to the value passed in |
| """ |
| |
| def __init__(self, guc_name, new_guc_value, error_on_fail=True): |
| self.guc_name = guc_name |
| self.new_guc_value = new_guc_value |
| if not self.guc_name or not self.new_guc_value: |
| plpy.error("Both guc_name and new_guc_value need to have a non null" |
| "value") |
| self.error_on_fail = error_on_fail |
| self.guc_exists = True |
| self.old_value = None |
| |
| def __enter__(self): |
| if self.guc_exists: |
| # check if allowed to change the GUC |
| try: |
| show_query = "show {0}".format(self.guc_name) |
| self.old_value = plpy.execute(show_query)[0] |
| self.old_value = self.old_value["{0}".format(self.guc_name)] |
| except plpy.SPIError: |
| self.guc_exists = False |
| return self |
| |
| if self.new_guc_value: |
| plpy.execute("set {0}='{1}'".format(self.guc_name, |
| self.new_guc_value)) |
| else: |
| if self.error_on_fail: |
| plpy.error("Cannot set {0} to None. Please provide a valid value" |
| .format(self.guc_name)) |
| plpy.error("Unable to change '{0}' value. " |
| "Set '{0} = \'{1}\'' to proceed.". |
| format(self.guc_name, self.guc_value)) |
| return self |
| |
| def __exit__(self, *args): |
| if args and args[0]: |
| # an exception was raised in code, return False so that any |
| # exception is re-raised after exit. The transaction will not |
| # commit leading to reset of any change to parameter. |
| return False |
| else: |
| if self.guc_exists and self.old_value: |
| pass |
| plpy.execute("set {0}='{1}'". |
| format(self.guc_name, self.old_value)) |
| |
| |
| class OptimizerControl(ContextDecorator): |
| """ |
| @brief: A wrapper that enables/disables the optimizer and |
| then sets it back to the original value on exit |
| |
| This context manager accesses two GUCs: |
| optimizer: GUC to control the optimizer used in Greenplum |
| |
| optimizer_control: Used to check if 'optimizer' GUC can be updated by MADlib. |
| This is assumed to be True if the GUC is not available |
| """ |
| |
| def __init__(self, enable=True, error_on_fail=False): |
| self.to_enable = enable |
| self.error_on_fail = error_on_fail |
| |
| # use the fact that all GPDB versions that have the |
| # optimizer also define function properties |
| self.guc_exists = True if HAS_FUNCTION_PROPERTIES else False |
| |
| def __enter__(self): |
| if self.guc_exists: |
| # check if allowed to change the GUC |
| try: |
| self.optimizer_control = bool(strtobool( |
| plpy.execute("show optimizer_control")[0]["optimizer_control"])) |
| except plpy.SPIError: |
| self.optimizer_control = True |
| |
| if self.optimizer_control: |
| self.optimizer_enabled = bool(strtobool( |
| plpy.execute("show optimizer")[0]["optimizer"])) |
| new_optimizer = 'on' if self.to_enable else 'off' |
| plpy.execute("set optimizer={0}".format(new_optimizer)) |
| else: |
| if self.error_on_fail: |
| plpy.error("Unable to change 'optimizer' value. " |
| "Set 'optimizer_control = on' to proceed.") |
| |
| def __exit__(self, *args): |
| if args and args[0]: |
| # an exception was raised in code, return False so that any |
| # exception is re-raised after exit. The transaction will not |
| # commit leading to reset of any change to parameter. |
| return False |
| else: |
| if self.guc_exists and self.optimizer_control: |
| plpy.execute("set optimizer={0}". |
| format(('off', 'on')[self.optimizer_enabled])) |
| |
| |
| class HashaggControl(ContextDecorator): |
| |
| """ |
| @brief: A wrapper that enables/disables the hashagg and then sets it back |
| to the original value on exit |
| |
| This context manager should be used at the top-level and any exception |
| raised from this should be re-raised (if caught) to ensure the transaction |
| does not commit. |
| """ |
| |
| def __init__(self, enable=True): |
| self.to_enable = enable |
| self.hashagg_enabled = False |
| self.guc_exists = True |
| |
| def __enter__(self): |
| try: |
| enable_hashagg = plpy.execute("show enable_hashagg")[0]["enable_hashagg"] |
| self.hashagg_enabled = bool(strtobool(enable_hashagg)) |
| plpy.execute("set enable_hashagg={0}". |
| format(('off', 'on')[self.to_enable])) |
| except plpy.SPIError: |
| self.guc_exists = False |
| finally: |
| return self |
| |
| def __exit__(self, *args): |
| if args and args[0]: |
| # an exception was raised in code. We return False so that any |
| # exception is re-raised after exit. The transaction will not |
| # commit leading to reset of parameter value. |
| return False |
| else: |
| if self.guc_exists: |
| plpy.execute("set enable_hashagg={0}". |
| format(('off', 'on')[self.hashagg_enabled])) |
| |
| |
| class MinWarning(ContextDecorator): |
| |
| """ |
| @brief A wrapper for setting the level of logs going into client |
| |
| This context manager should be used at the top-level and any exception |
| raised from this should be re-raised (if caught) to ensure the transaction |
| does not commit. |
| """ |
| |
| def __init__(self, warningLevel='error'): |
| self.warningLevel = warningLevel |
| |
| def __enter__(self): |
| self.oldMsgLevel = plpy.execute(""" |
| SELECT setting FROM pg_settings WHERE name='client_min_messages' |
| """)[0]['setting'] |
| plpy.execute("SET client_min_messages = {warningLevel}". |
| format(warningLevel=self.warningLevel)) |
| return self |
| |
| def __exit__(self, *args): |
| if args and args[0]: |
| # an exception was raised in code. We return False so that any |
| # exception is re-raised after exit. The transaction will not |
| # commit leading to reset of client_min_messages. |
| return False |
| else: |
| # if no exception then we reset the client_min_messages |
| plpy.execute("SET client_min_messages = {oldMsgLevel}; ". |
| format(oldMsgLevel=self.oldMsgLevel)) |
| |
| |
| class AOControl(ContextDecorator): |
| |
| """ |
| @brief: A wrapper that enables/disables the AO storage option |
| |
| This context manager should be used at the top-level and any exception |
| raised from this should be re-raised (if caught) to ensure the transaction |
| does not commit. |
| """ |
| |
| def __init__(self, enable=False): |
| self.to_enable = enable |
| self.was_ao_enabled = False |
| self.guc_exists = True |
| self.storage_options_dict = dict() |
| |
| def _parse_gp_default_storage_options(self, gp_default_storage_options_str): |
| """ Parse comma separated key=value pairs |
| |
| Example: |
| appendonly=false,blocksize=32768,compresstype=none,checksum=true,orientation=row |
| """ |
| self.storage_options_dict = extract_keyvalue_params(gp_default_storage_options_str) |
| self.storage_options_dict['appendonly'] = bool( |
| strtobool(self.storage_options_dict['appendonly'])) |
| |
| @property |
| def _gp_default_storage_options(self): |
| return ','.join(['{0}={1}'.format(k, v) |
| for k, v in self.storage_options_dict.iteritems()]) |
| |
| def __enter__(self): |
| # We first check if we can get the guc value from the database using the |
| # show command. If this fails, then we assume that the guc doesn't exist |
| # and we ignore the error and return. This can happen when platform is |
| # postgres, or platform is gpdb but the guc doesn't exist anymore. |
| try: |
| _storage_options_str = plpy.execute( |
| "show gp_default_storage_options")[0]["gp_default_storage_options"] |
| except plpy.SPIError: |
| self.guc_exists = False |
| return self |
| |
| if self.guc_exists: |
| self._parse_gp_default_storage_options(_storage_options_str) |
| # Set APPENDONLY=<enable> after backing up existing value |
| self.was_ao_enabled = self.storage_options_dict['appendonly'] |
| self.storage_options_dict['appendonly'] = self.to_enable |
| plpy.execute("set gp_default_storage_options='{0}'". |
| format(self._gp_default_storage_options)) |
| |
| return self |
| |
| def __exit__(self, *args): |
| if self.guc_exists: |
| self.storage_options_dict['appendonly'] = self.was_ao_enabled |
| plpy.execute("set gp_default_storage_options='{0}'". |
| format(self._gp_default_storage_options)) |
| if args and args[0]: |
| # an exception was raised in code. We return False so that any |
| # exception is re-raised after exit. |
| return False |
| |
| |
| class IterationController: |
| |
| """ |
| @brief Abstraction for implementing driver functions in PL/Python |
| |
| This class encapsulates handling of the inter-iteration state. The design |
| goal is to avoid any conversion between backend-native types and those of |
| procedureal languages like PL/Python. Therefore, the expectation is that |
| |
| ***all only "template" parameters are passes as PL/Python arguments***, |
| |
| whereas non-template arguments are provided in an argument table. Here, |
| "template" arguments are those parameters that cannot be SQL parameters, |
| |
| ***such as table and column names***. |
| |
| The inter-state iteration table contains two columns: |
| - <tt>_iteration INTEGER</tt> - The 0-based iteration number |
| - <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after |
| iteration \c _interation) |
| """ |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables=True, |
| truncAfterIteration=False, |
| schema_madlib="MADLIB_SCHEMA_MISSING", |
| verbose=False, |
| initialize_state=False, |
| **kwargs): |
| self.kwargs = kwargs |
| self.kwargs.update( |
| unqualified_rel_state=rel_state, |
| rel_args=('pg_temp.' if temporaryTables else '') + rel_args, |
| rel_state=('pg_temp.' if temporaryTables else '') + rel_state, |
| stateType=stateType.format(schema_madlib=schema_madlib), |
| schema_madlib=schema_madlib) |
| self.temporaryTables = temporaryTables |
| self.truncAfterIteration = truncAfterIteration |
| self.verbose = verbose |
| self.inWith = False |
| self.iteration = -1 |
| self.initialize_state = initialize_state |
| |
| def __enter__(self): |
| with MinWarning('warning'): |
| self.runSQL(""" |
| DROP TABLE IF EXISTS {rel_state}; |
| CREATE {temp} TABLE {unqualified_rel_state} ( |
| _iteration INTEGER PRIMARY KEY, |
| _state {stateType} |
| ) |
| """.format( |
| temp='TEMPORARY' if self.temporaryTables else '', |
| **self.kwargs)) |
| if self.initialize_state: |
| self.runSQL(""" |
| INSERT INTO {rel_state} VALUES (0, NULL) |
| """.format(**self.kwargs)) |
| self.inWith = True |
| return self |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| |
| def runSQL(self, sql): |
| if self.verbose: |
| plpy.notice(sql) |
| return plpy.execute(sql) |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| |
| # For GPDB 4.3 we disable the optimizer (ORCA) for the query planner |
| # since currently ORCA has a bug for left outer joins (MPP-21868). |
| # This should be removed when the issue is fixed in ORCA. |
| with OptimizerControl(False): |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT _state |
| FROM {{rel_state}} AS _state |
| WHERE _state._iteration = {{iteration}} |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, **self.kwargs)) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |
| def test(self, condition): |
| """ |
| Test if the given condition is satisfied. The condition may depend on |
| the current inter-iteration state and all arguments |
| |
| @param condition Boolean SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c condition evaluates to NULL, otherwise the Boolean |
| value of \c condition |
| """ |
| return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition)) |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| updateKwargs.update(**self.kwargs) |
| newState = newState.format(iteration=self.iteration, **updateKwargs) |
| self.iteration = self.iteration + 1 |
| |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format(iteration=self.iteration, |
| newState=newState, |
| **self.kwargs)) |
| if self.truncAfterIteration: |
| self.runSQL(""" |
| DELETE FROM {rel_state} AS _state |
| WHERE _state._iteration < {iteration} |
| """.format(iteration=self.iteration, **self.kwargs)) |
| |
| |
| class IterationController2D(IterationController): |
| |
| """ |
| @brief In-memory Iteration for 2-D array states |
| """ |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| # We disable the optimizer (ORCA) for the query planning |
| # since ORCA has a bug for left outer joins (MPP-21868). |
| # This should be removed when the issue is fixed in ORCA. |
| with OptimizerControl(False): |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT * |
| FROM {{rel_state}} AS _state |
| WHERE _state._iteration = {{iteration}} |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, **self.kwargs)) |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| updateKwargs.update(**self.kwargs) |
| newState = newState.format( |
| iteration=self.iteration, |
| **updateKwargs) |
| self.iteration = self.iteration + 1 |
| |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format(iteration=self.iteration, newState=newState, **self.kwargs)) |
| if self.truncAfterIteration: |
| self.runSQL(""" |
| DELETE FROM {rel_state} AS _state |
| WHERE _state._iteration < {iteration} |
| """.format(iteration=self.iteration, **self.kwargs)) |
| |
| |
| class IterationController2S(IterationController): |
| |
| """ |
| @brief Designed for the case where the state type is 1-D double array and |
| both the old state and new state are required |
| """ |
| |
| def evaluate(self, expression): |
| resultObject = self.runSQL(""" |
| SELECT ({expression}) AS expression |
| FROM {{rel_args}} AS _args |
| LEFT OUTER JOIN ( |
| SELECT |
| _state_previous, _state_current |
| FROM |
| ( |
| SELECT _state AS _state_previous |
| FROM {{rel_state}} |
| WHERE _iteration = {{iteration}} - 1 |
| ) sub1, |
| ( |
| SELECT _state AS _state_current |
| FROM {{rel_state}} |
| WHERE _iteration = {{iteration}} |
| ) sub2 |
| ) AS _state ON True |
| """.format(expression=expression). |
| format(iteration=self.iteration, |
| **self.kwargs)) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| return resultObject[0]['expression'] |
| |