| # coding=utf-8 |
| m4_changequote(`>>>', `<<<') |
| |
| """ |
| @file control.py_in |
| |
| @brief controller classes (e.g. iteration controller) |
| |
| @namespace utilities |
| |
| @brief driver functions shared by modules |
| """ |
| |
| import plpy |
| from control import MinWarning |
| from utilities import __mad_version |
| from utilities import unique_string |
| _unique_string = unique_string |
| |
| version_wrapper = __mad_version() |
| mad_vec = version_wrapper.select_vecfunc() |
| |
| class GroupIterationController: |
| """ |
| @brief Abstraction for implementing driver functions in PL/Python |
| |
| This class encapsulates handling of the inter-iteration state. The design |
| goal is to avoid any conversion between backend-native types and those of |
| procedureal languages like PL/Python. Therefore, the expectation is that |
| |
| *** all "template" parameters are passed as PL/Python arguments ***, |
| |
| whereas non-template arguments are provided in an argument table. Here, |
| "template" arguments are those parameters that cannot be SQL parameters, |
| |
| *** such as table and column names ***. |
| |
| This class assumes a transition state always has its status indicator |
| in the last element. 0 means in progress, 1 means completed, > 1 means |
| abnormal. Perhaps a C++ UDF should be added for extracting the status. |
| |
| The inter-state iteration table contains three columns: |
| - <tt>_grouping_cols</tt> - List of columns that are provided as grouping |
| arguments |
| - <tt>_iteration INTEGER</tt> - The 0-based iteration number |
| - <tt>_state <em>self.kwargs.stateType</em></tt> - The state (after |
| iteration \c _interation) |
| """ |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables = True, |
| schema_madlib = "MADLIB_SCHEMA_MISSING", |
| verbose = False, |
| grouping_str = "NULL", |
| col_grp_iteration="_iteration", |
| col_grp_state="_state", |
| **kwargs): |
| self.temporaryTables = temporaryTables |
| self.verbose = verbose |
| self.inWith = False |
| self.iteration = -1 |
| self.grouping_str = grouping_str |
| self.kwargs = kwargs |
| self.kwargs.update( |
| unqualified_rel_state = rel_state, |
| rel_args = ('pg_temp.' if temporaryTables else '') + rel_args, |
| rel_state = ('pg_temp.' if temporaryTables else '') + rel_state, |
| stateType = stateType.format(schema_madlib = schema_madlib), |
| schema_madlib = schema_madlib, |
| grouping_str = self.grouping_str, |
| col_grp_null = _unique_string(), |
| col_grp_key = _unique_string(), |
| col_grp_iteration=col_grp_iteration, |
| col_grp_state=col_grp_state |
| ) |
| grouping_col = "Null" if kwargs["grouping_col"] is None \ |
| else kwargs["grouping_col"] |
| using_str = "on True" if kwargs["grouping_col"] is None \ |
| else "using ({grouping_col})".format(**kwargs) |
| self.is_group_null = True if kwargs["grouping_col"] is None else False |
| self.kwargs["grouping_col"] = grouping_col |
| self.kwargs["using_str"] = using_str |
| self.grouping_col = grouping_col |
| m4_ifdef(>>>__HAWQ__<<<, >>> |
| self.new_states = [] |
| self.old_states = [] |
| self.finished_states = [] |
| <<<) |
| |
| def __enter__(self): |
| with MinWarning('warning'): |
| # currently assuming that groups is passed as a valid array |
| group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null \ |
| else "{grouping_col}").format(**self.kwargs) |
| groupby_str = ("{col_grp_null}" if self.is_group_null \ |
| else "{grouping_col}").format(**self.kwargs) |
| primary_str = "" if self.is_group_null else ", {grouping_col}".format(**self.kwargs) |
| |
| self.runSQL( |
| """ |
| drop table if exists {rel_state}; |
| create {temp} table {unqualified_rel_state} as ( |
| select |
| {group_col}, |
| 0::integer as {col_grp_iteration}, |
| Null::{stateType} as {col_grp_state} |
| m4_ifdef(>>>__HAWQ__<<<, >>>, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key}<<<) |
| from {rel_source} |
| group by {groupby_str} |
| ); |
| m4_ifdef(>>>__POSTGRESQL__<<<, >>><<<, >>> |
| alter table {rel_state} set distributed by ({col_grp_iteration} {primary_str}); |
| <<<) |
| """.format(group_col=group_col, groupby_str=groupby_str, primary_str=primary_str, |
| temp='TEMPORARY' if self.temporaryTables else '', **self.kwargs)) |
| null_test = " or ".join([g.strip() + " is NULL" for g in |
| self.kwargs['grouping_col'].split(",")]) |
| null_count = plpy.execute( |
| """ |
| select count(*) from {rel_state} where {null_test} |
| """.format(null_test=null_test, **self.kwargs))[0]['count'] |
| if null_count != 0 and primary_str != "": |
| plpy.error("Grouping error: at least one of the grouping columns contains NULL values!" |
| " Please filter out those NULL values.") |
| self.runSQL(""" |
| m4_ifdef(>>>__HAWQ__<<<, >>><<<, >>> |
| alter table {rel_state} add primary key ({col_grp_iteration} {primary_str}) |
| <<<) |
| """.format(primary_str=primary_str, **self.kwargs)) |
| m4_ifdef(>>>__HAWQ__<<<, >>> |
| self.new_states = plpy.execute("SELECT * FROM {rel_state}".format(**self.kwargs)) |
| if not self.is_group_null: |
| converted_source_table = _unique_string() |
| plpy.execute(""" |
| CREATE TEMP TABLE {converted_source_table} AS |
| SELECT *, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key} FROM {rel_source} |
| """.format(converted_source_table=converted_source_table, **self.kwargs)) |
| self.kwargs.update(rel_source=converted_source_table) |
| <<<) |
| self.inWith = True |
| return self |
| |
| def __exit__(self, type, value, tb): |
| self.inWith = False |
| m4_ifdef(>>>__HAWQ__<<<, >>> |
| insert_plan = plpy.prepare(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {grouping_col}, |
| _rel_state.iteration, |
| _rel_state.state, |
| _rel_state.grp_key |
| FROM |
| ( |
| SELECT |
| {grouping_col}, {col_grp_key} |
| FROM {rel_state} |
| WHERE {col_grp_iteration} = 0 |
| ) AS _src, |
| ( |
| SELECT |
| grp_key, iteration, state |
| FROM |
| {schema_madlib}._gen_state($1, $2, $3) |
| ) AS _rel_state |
| WHERE _src.{col_grp_key} = _rel_state.grp_key |
| """.format(**self.kwargs), ["text[]", "integer[]", "double precision[]"]) |
| |
| grp_keys = [] |
| states = [] |
| iterations = [] |
| null_empty = [] |
| for state in self.finished_states: |
| # the lenght of states for null_empty groups are different |
| if state["{col_grp_state}".format(**self.kwargs)][-1] == 3: |
| null_empty.append(state) |
| continue |
| iterations.append(state["{col_grp_iteration}".format(**self.kwargs)]) |
| grp_keys.append(state["{col_grp_key}".format(**self.kwargs)]) |
| states.extend(state["{col_grp_state}".format(**self.kwargs)]) |
| plpy.execute(insert_plan, [grp_keys, iterations, states]) |
| |
| grp_keys = [] |
| states = [] |
| iterations = [] |
| for state in null_empty: |
| iterations.append(state["{col_grp_iteration}".format(**self.kwargs)]) |
| grp_keys.append(state["{col_grp_key}".format(**self.kwargs)]) |
| states.extend(state["{col_grp_state}".format(**self.kwargs)]) |
| plpy.execute(insert_plan, [grp_keys, iterations, states]) |
| |
| if not self.is_group_null: |
| plpy.execute('DROP TABLE IF EXISTS ' + self.kwargs['rel_source']) |
| <<<) |
| |
| def runSQL(self, sql): |
| if self.verbose: |
| plpy.notice(sql) |
| return plpy.execute(sql) |
| |
| def evaluate(self, expression): |
| """ |
| Evaluate the given expression. The expression may depend on |
| the current inter-iteration state and all arguments |
| |
| @param expression SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c expression evaluates to NULL, otherwise the value of |
| \c expression |
| """ |
| m4_ifdef(>>>__HAWQ__<<<, >>> |
| eva_plan = plpy.prepare(""" |
| SELECT |
| ({expression}) AS _expression, |
| subq1.grp_key AS _groups |
| FROM {{rel_args}} AS _args |
| left outer join ( |
| ( |
| SELECT grp_key, state AS _state_previous |
| FROM {schema_madlib}._gen_state($1, NULL, $2) |
| ) sub1 |
| JOIN |
| ( |
| SELECT grp_key, state AS _state_current |
| FROM {schema_madlib}._gen_state($3, NULL, $4) |
| ) sub2 |
| using (grp_key) |
| ) AS subq1 ON True |
| """.format(expression = expression, **self.kwargs).format( |
| iteration=self.iteration, **self.kwargs), |
| ["text[]", "double precision[]", "text[]", "double precision[]"]) |
| |
| grp_keys_new = [] |
| states_new = [] |
| for state in self.new_states: |
| if state["{col_grp_state}".format(**self.kwargs)][-1] < 2: |
| grp_keys_new.append(state["{col_grp_key}".format(**self.kwargs)]) |
| states_new.extend(state["{col_grp_state}".format(**self.kwargs)]) |
| grp_keys_old = [] |
| states_old = [] |
| for state in self.old_states: |
| grp_keys_old.append(state["{col_grp_key}".format(**self.kwargs)]) |
| if state["{col_grp_state}".format(**self.kwargs)] is not None: |
| states_old.extend(state["{col_grp_state}".format(**self.kwargs)]) |
| |
| resultObject = plpy.execute(eva_plan, [grp_keys_old, states_old, grp_keys_new, states_new]) |
| <<<, |
| >>>resultObject = self.runSQL( |
| """ |
| SELECT |
| ({expression}) AS _expression, |
| ARRAY[{{grouping_str}}] AS _groups |
| FROM {{rel_args}} AS _args |
| left outer join ( |
| ( |
| SELECT {{grouping_col}}, {col_grp_state} AS _state_previous |
| FROM {{rel_state}} |
| WHERE {col_grp_iteration} = {{iteration}} - 1 |
| ) sub1 |
| JOIN |
| ( |
| SELECT {{grouping_col}}, {col_grp_state} AS _state_current |
| FROM {{rel_state}} |
| WHERE {col_grp_iteration} = {{iteration}} |
| ) sub2 |
| {using_str} |
| ) AS subq1 ON True |
| """.format(expression = expression, **self.kwargs).format( |
| iteration=self.iteration, **self.kwargs)) |
| <<<) |
| |
| if resultObject.nrows() == 0: |
| return None |
| else: |
| m4_ifdef(>>>__HAWQ__<<<, >>>complete_grps = []<<<) |
| for each_elem in resultObject: |
| m4_ifdef(>>>__HAWQ__<<<, >>><<<, >>> |
| # update status for each group |
| group_vector = mad_vec(each_elem["_groups"]) |
| groups_as_str = [None] * len(group_vector) |
| # convert group values to string objects |
| for index, each_grp in enumerate(group_vector): |
| if not each_grp or each_grp.lower() == 'null': |
| # NULL values should be outputed as NULL instead of |
| # as a string 'NULL' |
| groups_as_str[index] = "NULL::text" |
| else: |
| groups_as_str[index] = "'" + str(each_grp) + "'::text" |
| array_str = "array[" + ",".join(groups_as_str) + "]" |
| <<<) |
| # update status for the group if it completed iterating |
| if each_elem['_expression']: |
| m4_ifdef(>>>__HAWQ__<<<, |
| >>>complete_grps.append(each_elem["_groups"])<<<, |
| >>>self.runSQL( |
| """UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1 |
| WHERE |
| ARRAY[{grouping_str}] = {_group_val} and |
| {col_grp_state}[array_upper({col_grp_state}, 1)] < 2 and |
| {col_grp_iteration} = {iteration} |
| """.format( |
| _group_val=array_str, |
| iteration=self.iteration, |
| **self.kwargs)) |
| <<<) |
| |
| m4_ifdef(>>>__HAWQ__<<<, >>> |
| all_finish = True |
| tmp = [] |
| for state in self.new_states: |
| if state["{col_grp_key}".format(**self.kwargs)] in complete_grps \ |
| and state["{col_grp_state}".format(**self.kwargs)][-1] < 1: |
| state["{col_grp_state}".format(**self.kwargs)][-1] = 1 |
| if state["{col_grp_state}".format(**self.kwargs)][-1] < 1: |
| all_finish = False |
| tmp.append(state) |
| else: |
| self.finished_states.append(state) |
| self.new_states = tmp |
| return all_finish |
| <<<, >>> |
| # return True only if all group combinations have finished iterating |
| rv = self.runSQL( |
| """ |
| select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst |
| from {rel_state} as _state_table |
| where _state_table.{col_grp_iteration} = {iteration} |
| """.format( |
| iteration=self.iteration, |
| **self.kwargs))[0]["rst"] |
| return rv |
| <<<) |
| |
| def test(self, condition): |
| """ |
| Test if the given condition is satisfied. The condition may depend on |
| the current inter-iteration state and all arguments |
| |
| @param condition Boolean SQL expression. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| @return None if \c condition evaluates to NULL, otherwise the Boolean |
| value of \c condition |
| """ |
| return self.evaluate( |
| """ |
| CAST(({condition}) AS BOOLEAN) |
| """.format(condition = condition)) |
| |
| def update(self, newState, **updateKwargs): |
| """ |
| Update the inter-iteration state |
| |
| @param newState SQL expression of type |
| <tt>stateType.kwargs.stateType</tt>. The |
| following names are defined and can be used in the condition: |
| - \c _args - The (single-row) argument table |
| - \c _state - The row of the state table containing the latest |
| inter-iteration state |
| . |
| Note that <tt>{iteration}</tt> will still be the current iteration. |
| For instance, it could be used in the expression as a WHERE |
| condition: <tt>[...] WHERE _state._iteration = {iteration}</tt> |
| |
| This updates the current inter-iteration state to the result of |
| evaluating \c newState. If <tt>self.truncAfterIteration</tt> is true, |
| this will replace the old state, otherwise the history of all old states |
| is kept. |
| """ |
| newState = newState.format(**self.kwargs) |
| self.iteration = self.iteration + 1 |
| |
| groupby_str = "" if self.is_group_null \ |
| else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs) |
| |
| m4_ifdef(>>>__HAWQ__<<<, >>> |
| groupby_str = "" if self.is_group_null \ |
| else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs) |
| update_plan = plpy.prepare( |
| """ |
| select |
| {_grp_key} AS {col_grp_key}, |
| {grouping_col} {as_string}, |
| {iteration} AS {col_grp_iteration}, |
| ({newState}) AS {col_grp_state} |
| from |
| {rel_source} AS _src, |
| ( |
| SELECT |
| grp_key, state AS {col_grp_state} |
| FROM |
| {schema_madlib}._gen_state($1, NULL, $2) |
| ) AS rel_state |
| where |
| {_grp_key} = grp_key |
| {groupby_str} |
| """.format( |
| iteration=self.iteration, |
| groupby_str=groupby_str, |
| as_string='AS _grp' if self.is_group_null else '', |
| _grp_key="array_to_string(ARRAY[{grouping_str}], ',')".format(**self.kwargs) if self.is_group_null else self.kwargs['col_grp_key'], |
| newState = newState, |
| **self.kwargs), ["text[]", "double precision[]"]) |
| grp_keys = [] |
| states = [] |
| for state in self.new_states: |
| grp_keys.append(state["{col_grp_key}".format(**self.kwargs)]) |
| if state["{col_grp_state}".format(**self.kwargs)] is not None: |
| states.extend(state["{col_grp_state}".format(**self.kwargs)]) |
| |
| self.old_states = self.new_states |
| self.new_states = plpy.execute(update_plan, [grp_keys, states]) |
| <<<, |
| >>> |
| groupby_str = "" if self.is_group_null \ |
| else "group by {grouping_col}".format(**self.kwargs) |
| self.runSQL( |
| """ |
| insert into {rel_state} |
| (select |
| {grouping_col}, |
| {iteration}, |
| ({newState}) |
| from |
| ({rel_source} AS _src |
| join |
| {rel_state} AS rel_state |
| {using_str}) |
| where |
| rel_state.{col_grp_iteration} = {iteration} - 1 and |
| (case when {iteration} = 1 then |
| True |
| else |
| rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0 |
| end) |
| {groupby_str}) |
| """.format( |
| groupby_str = groupby_str, |
| iteration = self.iteration, |
| newState = newState, |
| **self.kwargs)) |
| <<<) |
| |
| m4_changequote(>>>`<<<, >>>'<<<) |