blob: 2522d1d70d9e58eb385b897d0ad2d2222fb1ecd9 [file] [log] [blame]
# coding=utf-8
"""
@file control.py_in
@brief controller classes (e.g. iteration controller)
@namespace utilities
@brief driver functions shared by modules
"""
import plpy
from control import MinWarning
from utilities import __mad_version
from utilities import unique_string
_unique_string = unique_string
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
m4_changequote(`<!', `!>')
class GroupIterationController:
"""
@brief Abstraction for implementing driver functions in PL/Python
This class encapsulates handling of the inter-iteration state. The design
goal is to avoid any conversion between backend-native types and those of
procedureal languages like PL/Python. Therefore, the expectation is that
*** all "template" parameters are passed as PL/Python arguments ***,
whereas non-template arguments are provided in an argument table. Here,
"template" arguments are those parameters that cannot be SQL parameters,
*** such as table and column names ***.
This class assumes a transition state always has its status indicator
in the last element. 0 means in progress, 1 means completed, > 1 means
abnormal. Perhaps a C++ UDF should be added for extracting the status.
The inter-state iteration table contains three columns:
- <tt>_grouping_cols</tt> - List of columns that are provided as grouping
arguments
- <tt>_iteration INTEGER</tt> - The 0-based iteration number
- <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after
iteration \c _interation)
"""
def __init__(self, rel_args, rel_state, stateType,
temporaryTables=True,
schema_madlib="MADLIB_SCHEMA_MISSING",
verbose=False,
grouping_str="NULL",
col_grp_iteration="_iteration",
col_grp_state="_state",
**kwargs):
self.temporaryTables = temporaryTables
self.verbose = verbose
self.inWith = False
self.iteration = -1
self.grouping_str = grouping_str
self.kwargs = kwargs
self.kwargs.update(
unqualified_rel_state=rel_state,
rel_args=('pg_temp.' if temporaryTables else '') + rel_args,
rel_state=('pg_temp.' if temporaryTables else '') + rel_state,
stateType=stateType.format(schema_madlib=schema_madlib),
schema_madlib=schema_madlib,
grouping_str=self.grouping_str,
col_grp_null=_unique_string(),
col_grp_key=_unique_string(),
col_grp_iteration=col_grp_iteration,
col_grp_state=col_grp_state
)
grouping_col = "Null" if kwargs["grouping_col"] is None else kwargs["grouping_col"]
using_str = "on True" if kwargs["grouping_col"] is None else "using ({grouping_col})".format(**kwargs)
self.is_group_null = True if kwargs["grouping_col"] is None else False
self.kwargs["grouping_col"] = grouping_col
self.kwargs["using_str"] = using_str
self.grouping_col = grouping_col
def __enter__(self):
with MinWarning('warning'):
# currently assuming that groups is passed as a valid array
group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null
else "{grouping_col}").format(**self.kwargs)
groupby_str = ("{col_grp_null}" if self.is_group_null
else "{grouping_col}").format(**self.kwargs)
primary_str = "" if self.is_group_null else ", {grouping_col}".format(**self.kwargs)
self.runSQL(
"""
drop table if exists {rel_state};
create {temp} table {unqualified_rel_state} as (
select
{group_col},
0::integer as {col_grp_iteration},
Null::{stateType} as {col_grp_state}
from {rel_source}
group by {groupby_str}
);
m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!
alter table {rel_state} set distributed by ({col_grp_iteration} {primary_str});
!>)
""".format(group_col=group_col, groupby_str=groupby_str, primary_str=primary_str,
temp='TEMPORARY' if self.temporaryTables else '', **self.kwargs))
null_test = " or ".join([g.strip() + " is NULL" for g in
self.kwargs['grouping_col'].split(",")])
null_count = plpy.execute(
"""
select count(*) from {rel_state} where {null_test}
""".format(null_test=null_test, **self.kwargs))[0]['count']
if null_count != 0 and primary_str:
plpy.error("Grouping error: at least one of the grouping columns contains NULL values!"
" Please filter out those NULL values.")
self.runSQL("alter table {rel_state} add primary key "
"({col_grp_iteration} {primary_str})".
format(primary_str=primary_str, **self.kwargs))
self.inWith = True
return self
def __exit__(self, type, value, tb):
self.inWith = False
def runSQL(self, sql):
if self.verbose:
plpy.notice(sql)
return plpy.execute(sql)
def evaluate(self, expression):
"""
Evaluate the given expression. The expression may depend on
the current inter-iteration state and all arguments
@param expression SQL expression. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
@return None if \c expression evaluates to NULL, otherwise the value of
\c expression
"""
resultObject = self.runSQL("""
SELECT
({expression}) AS _expression,
ARRAY[{{grouping_str}}] AS _groups
FROM {{rel_args}} AS _args
left outer join (
(
SELECT {{grouping_col}}, {col_grp_state} AS _state_previous
FROM {{rel_state}}
WHERE {col_grp_iteration} = {{iteration}} - 1
) sub1
JOIN
(
SELECT {{grouping_col}}, {col_grp_state} AS _state_current
FROM {{rel_state}}
WHERE {col_grp_iteration} = {{iteration}}
) sub2
{using_str}
) AS subq1 ON True
""".format(expression=expression, **self.kwargs).
format(iteration=self.iteration, **self.kwargs))
if resultObject.nrows() == 0:
return None
else:
complete_grps = []
for each_elem in resultObject:
# update status for each group
group_vector = mad_vec(each_elem["_groups"])
groups_as_str = [None] * len(group_vector)
# convert group values to string objects
for index, each_grp in enumerate(group_vector):
if not each_grp or each_grp.lower() == 'null':
# NULL values should be outputed as NULL instead of
# as a string 'NULL'
groups_as_str[index] = "NULL::text"
else:
groups_as_str[index] = "'" + str(each_grp) + "'::text"
array_str = "array[" + ",".join(groups_as_str) + "]"
# update status for the group if it completed iterating
if each_elem['_expression']:
self.runSQL("""
UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1
WHERE
ARRAY[{grouping_str}] = {_group_val} and
{col_grp_state}[array_upper({col_grp_state}, 1)] < 2 and
{col_grp_iteration} = {iteration}
""".format(
_group_val=array_str,
iteration=self.iteration,
**self.kwargs))
# return True only if all group combinations have finished iterating
rv = self.runSQL(
"""
select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst
from {rel_state} as _state_table
where _state_table.{col_grp_iteration} = {iteration}
""".format(
iteration=self.iteration,
**self.kwargs))[0]["rst"]
return rv
def test(self, condition):
"""
Test if the given condition is satisfied. The condition may depend on
the current inter-iteration state and all arguments
@param condition Boolean SQL expression. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
@return None if \c condition evaluates to NULL, otherwise the Boolean
value of \c condition
"""
return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition))
def update(self, newState, **updateKwargs):
"""
Update the inter-iteration state
@param newState SQL expression of type
<tt>stateType.kwargs.stateType</tt>. The
following names are defined and can be used in the condition:
- \c _args - The (single-row) argument table
- \c _state - The row of the state table containing the latest
inter-iteration state
.
Note that <tt>{iteration}</tt> will still be the current iteration.
For instance, it could be used in the expression as a WHERE
condition: <tt>[...] WHERE _state._iteration = {iteration}</tt>
This updates the current inter-iteration state to the result of
evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true,
this will replace the old state, otherwise the history of all old states
is kept.
"""
newState = newState.format(**self.kwargs)
self.iteration = self.iteration + 1
groupby_str = "" if self.is_group_null \
else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs)
groupby_str = "" if self.is_group_null \
else "group by {grouping_col}".format(**self.kwargs)
self.runSQL(
"""
insert into {rel_state}
(select
{grouping_col},
{iteration},
({newState})
from
({rel_source} AS _src
join
{rel_state} AS rel_state
{using_str})
where
rel_state.{col_grp_iteration} = {iteration} - 1 and
(case when {iteration} = 1 then
True
else
rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0
end)
{groupby_str})
""".format(
groupby_str=groupby_str,
iteration=self.iteration,
newState=newState,
**self.kwargs))
m4_changequote(<!`!>, <!'!>)