blob: 32f2f598c49af8add9ba16297c7f0785dc5232a8 [file] [log] [blame]
# coding=utf-8
"""
@file control.py_in
@brief controller classes (e.g. iteration controller)
@namespace utilities
@brief driver functions shared by modules
"""
import plpy
from distutils.util import strtobool
from functools import wraps
from utilities import extract_keyvalue_params
from utilities import unique_string
HAS_FUNCTION_PROPERTIES = m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `True', `False')
# from https://coderwall.com/p/0lk6jg/python-decorators-vs-context-managers-have-your-cake-and-eat-it
class ContextDecorator(object):
""" Class to use a context manager also as a decorator
Inherit context manager classes from this class to use as a decorator
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __enter__(self):
# Note: Returning self means that in "with ... as x", x will be self
return self
def __exit__(self, typ, val, traceback):
pass
def __call__(self, f):
@wraps(f)
def wrapper(*args, **kw):
with self:
return f(*args, **kw)
return wrapper
class SetGUC(ContextDecorator):
"""
@brief: A wrapper that sets/unsets GUCs and then sets it
back to the original value on exit
This context manager sets the specified GUC to the value passed in
"""
def __init__(self, guc_name, new_guc_value, error_on_fail=True):
self.guc_name = guc_name
self.new_guc_value = new_guc_value
if not self.guc_name or not self.new_guc_value:
plpy.error("Both guc_name and new_guc_value need to have a non null"
"value")
self.error_on_fail = error_on_fail
self.guc_exists = True
self.old_value = None
def __enter__(self):
if self.guc_exists:
# check if allowed to change the GUC
try:
show_query = "show {0}".format(self.guc_name)
self.old_value = plpy.execute(show_query)[0]
self.old_value = self.old_value["{0}".format(self.guc_name)]
except plpy.SPIError:
self.guc_exists = False
return self
if self.new_guc_value:
plpy.execute("set {0}='{1}'".format(self.guc_name,
self.new_guc_value))
else:
if self.error_on_fail:
plpy.error("Cannot set {0} to None. Please provide a valid value"
.format(self.guc_name))
plpy.error("Unable to change '{0}' value. "
"Set '{0} = \'{1}\'' to proceed.".
format(self.guc_name, self.guc_value))
return self
def __exit__(self, *args):
if args and args[0]:
# an exception was raised in code, return False so that any
# exception is re-raised after exit. The transaction will not
# commit leading to reset of any change to parameter.
return False
else:
if self.guc_exists and self.old_value:
pass
plpy.execute("set {0}='{1}'".
format(self.guc_name, self.old_value))
class OptimizerControl(ContextDecorator):
"""
@brief: A wrapper that enables/disables the optimizer and
then sets it back to the original value on exit
This context manager accesses two GUCs:
optimizer: GUC to control the optimizer used in Greenplum
optimizer_control: Used to check if 'optimizer' GUC can be updated by MADlib.
This is assumed to be True if the GUC is not available
"""
def __init__(self, enable=True, error_on_fail=False):
self.to_enable = enable
self.error_on_fail = error_on_fail
# use the fact that all GPDB versions that have the
# optimizer also define function properties
self.guc_exists = True if HAS_FUNCTION_PROPERTIES else False
def __enter__(self):
if self.guc_exists:
# check if allowed to change the GUC
try:
self.optimizer_control = bool(strtobool(
plpy.execute("show optimizer_control")[0]["optimizer_control"]))
except plpy.SPIError:
self.optimizer_control = True
if self.optimizer_control:
self.optimizer_enabled = bool(strtobool(
plpy.execute("show optimizer")[0]["optimizer"]))
new_optimizer = 'on' if self.to_enable else 'off'
plpy.execute("set optimizer={0}".format(new_optimizer))
else:
if self.error_on_fail:
plpy.error("Unable to change 'optimizer' value. "
"Set 'optimizer_control = on' to proceed.")
def __exit__(self, *args):
if args and args[0]:
# an exception was raised in code, return False so that any
# exception is re-raised after exit. The transaction will not
# commit leading to reset of any change to parameter.
return False
else:
if self.guc_exists and self.optimizer_control:
plpy.execute("set optimizer={0}".
format(('off', 'on')[self.optimizer_enabled]))
class HashaggControl(ContextDecorator):
"""
@brief: A wrapper that enables/disables the hashagg and then sets it back
to the original value on exit
This context manager should be used at the top-level and any exception
raised from this should be re-raised (if caught) to ensure the transaction
does not commit.
"""
def __init__(self, enable=True):
self.to_enable = enable
self.hashagg_enabled = False
self.guc_exists = True
def __enter__(self):
try:
enable_hashagg = plpy.execute("show enable_hashagg")[0]["enable_hashagg"]
self.hashagg_enabled = bool(strtobool(enable_hashagg))
plpy.execute("set enable_hashagg={0}".
format(('off', 'on')[self.to_enable]))
except plpy.SPIError:
self.guc_exists = False
finally:
return self
def __exit__(self, *args):
if args and args[0]:
# an exception was raised in code. We return False so that any
# exception is re-raised after exit. The transaction will not
# commit leading to reset of parameter value.
return False
else:
if self.guc_exists:
plpy.execute("set enable_hashagg={0}".
format(('off', 'on')[self.hashagg_enabled]))
class MinWarning(ContextDecorator):
"""
@brief A wrapper for setting the level of logs going into client
This context manager should be used at the top-level and any exception
raised from this should be re-raised (if caught) to ensure the transaction
does not commit.
"""
def __init__(self, warningLevel='error'):
self.warningLevel = warningLevel
def __enter__(self):
self.oldMsgLevel = plpy.execute("""
SELECT setting FROM pg_settings WHERE name='client_min_messages'
""")[0]['setting']
plpy.execute("SET client_min_messages = {warningLevel}".
format(warningLevel=self.warningLevel))
return self
def __exit__(self, *args):
if args and args[0]:
# an exception was raised in code. We return False so that any
# exception is re-raised after exit. The transaction will not
# commit leading to reset of client_min_messages.
return False
else:
# if no exception then we reset the client_min_messages
plpy.execute("SET client_min_messages = {oldMsgLevel}; ".
format(oldMsgLevel=self.oldMsgLevel))
class AOControl(ContextDecorator):
"""
@brief: A wrapper that enables/disables the AO storage option
This context manager should be used at the top-level and any exception
raised from this should be re-raised (if caught) to ensure the transaction
does not commit.
"""
def __init__(self, enable=False):
self.to_enable = enable
self.was_ao_enabled = False
self.guc_exists = True
self.storage_options_dict = dict()
def _parse_gp_default_storage_options(self, gp_default_storage_options_str):
""" Parse comma separated key=value pairs
Example:
appendonly=false,blocksize=32768,compresstype=none,checksum=true,orientation=row
"""
self.storage_options_dict = extract_keyvalue_params(gp_default_storage_options_str)
self.storage_options_dict['appendonly'] = bool(
strtobool(self.storage_options_dict['appendonly']))
@property
def _gp_default_storage_options(self):
return ','.join(['{0}={1}'.format(k, v)
for k, v in self.storage_options_dict.iteritems()])
def __enter__(self):
# We first check if we can get the guc value from the database using the
# show command. If this fails, then we assume that the guc doesn't exist
# and we ignore the error and return. This can happen when platform is
# postgres, or platform is gpdb but the guc doesn't exist anymore.
try:
_storage_options_str = plpy.execute(
"show gp_default_storage_options")[0]["gp_default_storage_options"]
except plpy.SPIError:
self.guc_exists = False
return self
if self.guc_exists:
self._parse_gp_default_storage_options(_storage_options_str)
# Set APPENDONLY=<enable> after backing up existing value
self.was_ao_enabled = self.storage_options_dict['appendonly']
self.storage_options_dict['appendonly'] = self.to_enable
plpy.execute("set gp_default_storage_options='{0}'".
format(self._gp_default_storage_options))
return self
def __exit__(self, *args):
if self.guc_exists:
self.storage_options_dict['appendonly'] = self.was_ao_enabled
plpy.execute("set gp_default_storage_options='{0}'".
format(self._gp_default_storage_options))
if args and args[0]:
# an exception was raised in code. We return False so that any
# exception is re-raised after exit.
return False
class IterationController:
"""
@brief Abstraction for implementing driver functions in PL/Python
This class encapsulates handling of the inter-iteration state. The design
goal is to avoid any conversion between backend-native types and those of
procedureal languages like PL/Python. Therefore, the expectation is that
***all only "template" parameters are passes as PL/Python arguments***,
whereas non-template arguments are provided in an argument table. Here,
"template" arguments are those parameters that cannot be SQL parameters,
***such as table and column names***.
The inter-state iteration table contains two columns:
- <tt>_iteration INTEGER</tt> - The 0-based iteration number
- <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after
iteration \c _interation)
"""
def __init__(self, rel_args, rel_state, stateType,
temporaryTables=True,
truncAfterIteration=False,
schema_madlib="MADLIB_SCHEMA_MISSING",
verbose=False,
initialize_state=False,
**kwargs):
self.kwargs = kwargs
self.kwargs.update(
unqualified_rel_state=rel_state,
rel_args=('pg_temp.' if temporaryTables else '') + rel_args,
rel_state=('pg_temp.' if temporaryTables else '') + rel_state,
stateType=stateType.format(schema_madlib=schema_madlib),
schema_madlib=schema_madlib)
self.temporaryTables = temporaryTables
self.truncAfterIteration = truncAfterIteration
self.verbose = verbose
self.inWith = False
self.iteration = -1
self.initialize_state = initialize_state
def __enter__(self):
with MinWarning('warning'):
self.runSQL("""
DROP TABLE IF EXISTS {rel_state};
CREATE {temp} TABLE {unqualified_rel_state} (
_iteration INTEGER PRIMARY KEY,
_state {stateType}
)
""".format(
temp='TEMPORARY' if self.temporaryTables else '',
**self.kwargs))
if self.initialize_state:
self.runSQL("""
INSERT INTO {rel_state} VALUES (0, NULL)
""".format(**self.kwargs))
self.inWith = True
return self
def __exit__(self, type, value, tb):
self.inWith = False
def runSQL(self, sql):
if self.verbose:
plpy.notice(sql)
return plpy.execute(sql)
def evaluate(self, expression):
"""
Evaluate the given expression. The expression may depend on
the current inter-iteration state and all arguments
@param expression SQL expression. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
@return None if \c expression evaluates to NULL, otherwise the value of
\c expression
"""
# For GPDB 4.3 we disable the optimizer (ORCA) for the query planner
# since currently ORCA has a bug for left outer joins (MPP-21868).
# This should be removed when the issue is fixed in ORCA.
with OptimizerControl(False):
resultObject = self.runSQL("""
SELECT ({expression}) AS expression
FROM {{rel_args}} AS _args
LEFT OUTER JOIN (
SELECT _state
FROM {{rel_state}} AS _state
WHERE _state._iteration = {{iteration}}
) AS _state ON True
""".format(expression=expression).
format(iteration=self.iteration, **self.kwargs))
if resultObject.nrows() == 0:
return None
else:
return resultObject[0]['expression']
def test(self, condition):
"""
Test if the given condition is satisfied. The condition may depend on
the current inter-iteration state and all arguments
@param condition Boolean SQL expression. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
@return None if \c condition evaluates to NULL, otherwise the Boolean
value of \c condition
"""
return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
def update(self, newState, **updateKwargs):
"""
Update the inter-iteration state
@param newState SQL expression of type
<tt>stateType.kwargs.stateType</tt>. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
.
Note that <tt>{iteration}</tt> will still be the current iteration.
For instance, it could be used in the expression as a WHERE
condition: <tt>[...] WHERE _state._iteration = {iteration}</tt>
This updates the current inter-iteration state to the result of
evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true,
this will replace the old state, otherwise the history of all old states
is kept.
"""
updateKwargs.update(**self.kwargs)
newState = newState.format(iteration=self.iteration, **updateKwargs)
self.iteration = self.iteration + 1
self.runSQL("""
INSERT INTO {rel_state}
SELECT
{iteration},
({newState})
""".format(iteration=self.iteration,
newState=newState,
**self.kwargs))
if self.truncAfterIteration:
self.runSQL("""
DELETE FROM {rel_state} AS _state
WHERE _state._iteration < {iteration}
""".format(iteration=self.iteration, **self.kwargs))
class IterationController2D(IterationController):
"""
@brief In-memory Iteration for 2-D array states
"""
def __exit__(self, type, value, tb):
self.inWith = False
def evaluate(self, expression):
"""
Evaluate the given expression. The expression may depend on
the current inter-iteration state and all arguments
@param expression SQL expression. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
@return None if \c expression evaluates to NULL, otherwise the value of
\c expression
"""
# We disable the optimizer (ORCA) for the query planning
# since ORCA has a bug for left outer joins (MPP-21868).
# This should be removed when the issue is fixed in ORCA.
with OptimizerControl(False):
resultObject = self.runSQL("""
SELECT ({expression}) AS expression
FROM {{rel_args}} AS _args
LEFT OUTER JOIN (
SELECT *
FROM {{rel_state}} AS _state
WHERE _state._iteration = {{iteration}}
) AS _state ON True
""".format(expression=expression).
format(iteration=self.iteration, **self.kwargs))
if resultObject.nrows() == 0:
return None
else:
return resultObject[0]['expression']
def update(self, newState, **updateKwargs):
"""
Update the inter-iteration state
@param newState SQL expression of type
<tt>stateType.kwargs.stateType</tt>. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
.
Note that <tt>{iteration}</tt> will still be the current iteration.
For instance, it could be used in the expression as a WHERE
condition: <tt>[...] WHERE _state._iteration = {iteration}</tt>
This updates the current inter-iteration state to the result of
evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true,
this will replace the old state, otherwise the history of all old states
is kept.
"""
updateKwargs.update(**self.kwargs)
newState = newState.format(
iteration=self.iteration,
**updateKwargs)
self.iteration = self.iteration + 1
self.runSQL("""
INSERT INTO {rel_state}
SELECT
{iteration},
({newState})
""".format(iteration=self.iteration, newState=newState, **self.kwargs))
if self.truncAfterIteration:
self.runSQL("""
DELETE FROM {rel_state} AS _state
WHERE _state._iteration < {iteration}
""".format(iteration=self.iteration, **self.kwargs))
class IterationController2S(IterationController):
"""
@brief Designed for the case where the state type is 1-D double array and
both the old state and new state are required
"""
def evaluate(self, expression):
resultObject = self.runSQL("""
SELECT ({expression}) AS expression
FROM {{rel_args}} AS _args
LEFT OUTER JOIN (
SELECT
_state_previous, _state_current
FROM
(
SELECT _state AS _state_previous
FROM {{rel_state}}
WHERE _iteration = {{iteration}} - 1
) sub1,
(
SELECT _state AS _state_current
FROM {{rel_state}}
WHERE _iteration = {{iteration}}
) sub2
) AS _state ON True
""".format(expression=expression).
format(iteration=self.iteration,
**self.kwargs))
if resultObject.nrows() == 0:
return None
else:
return resultObject[0]['expression']