| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import plpy |
| import string |
| import re |
| |
| from control import MinWarning |
| from utilities import unique_string, _assert, split_quoted_delimited_str |
| from validate_args import get_cols |
| from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid |
| |
| m4_changequote(`<!', `!>') |
| |
| |
| def sessionize(schema_madlib, source_table, output_table, partition_expr, |
| time_stamp, max_time, output_cols, create_view, **kwargs): |
| """ |
| Perform sessionization over a sequence of rows. |
| |
| Args: |
| @param schema_madlib: str, Name of the MADlib schema |
| @param source_table: str, Name of the input table/view |
| @param output_table: str, Name of the table to store result |
| @param partition_expr: str, Expression to partition (group) the input data |
| @param time_stamp: str, The time stamp column name that is used for sessionization calculation |
| @param max_time: interval, Delta time between subsequent events to define a session |
| @param output_cols: str, a valid postgres SELECT expression (default '*') |
| @param create_view: boolean, indicates if the output is a view or a table with name |
| specified by output_table (default TRUE): |
| TRUE - create view |
| FALSE - materialize results into a table |
| """ |
| with MinWarning("error"): |
| if not partition_expr: |
| partition_expr = "1 = 1" |
| _validate(source_table, output_table, partition_expr, time_stamp, max_time) |
| # Checking for 'None' too since create_view and output_cols could be |
| # explicitly set to NULL by the user. |
| table_or_view = 'VIEW' if create_view or create_view is None else 'TABLE' |
| # Empty string ('') for output_cols is also set to default value '*' |
| output_cols = '*' if not output_cols or output_cols is None else output_cols |
| |
| # If the output_cols has '*' as one of the elements, expand it to |
| # include all columns in the source table. The following list |
| # comprehension is only to handle the case where '*' is included |
| # in output_cols. Using '*' as is, without expanding it to specific |
| # column names leads to some temporary intermediate columns |
| # (new_partition and new_session defined below) occurring in the output. |
| cols_to_project_list = [', '.join(get_cols(source_table)) if i=='*' else i |
| for i in split_quoted_delimited_str(output_cols)] |
| |
| # Examples of Invalid SELECT expression in output_cols: |
| # 1) If output_cols contains '*' along with an existing column name |
| # in the source table, postgres will throw an error and fail |
| # for specifying duplicate column names in the output table/view. |
| # 2) If output_cols contains more than 1 expressions which are not |
| # renamed using ' AS ', postgres will fail since it will try to |
| # rename all such new columns as '?column?'. This is considered an |
| # invalid SELECT expression. |
| cols_to_project = ', '.join(cols_to_project_list) |
| |
| session_id = 'session_id' if not is_var_valid(source_table, 'session_id')\ |
| else unique_string('session_id') |
| |
| # Create temp column names for intermediate columns. |
| new_partition = unique_string('new_partition') |
| new_session = unique_string('new_session') |
| |
| try: |
| plpy.execute(""" |
| CREATE {table_or_view} {output_table} AS |
| SELECT |
| {cols_to_project}, |
| CASE WHEN {time_stamp} IS NOT NULL |
| THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END) |
| OVER (PARTITION BY {partition_expr} ORDER BY {time_stamp}) |
| END AS {session_id} |
| FROM ( |
| SELECT *, |
| ROW_NUMBER() OVER (w) = 1 AND {time_stamp} IS NOT NULL AS {new_partition}, |
| ({time_stamp}-LAG({time_stamp}, 1) OVER (w)) > '{max_time}'::INTERVAL AS {new_session} |
| FROM {source_table} WINDOW w AS (PARTITION BY {partition_expr} ORDER BY {time_stamp}) |
| ) a |
| """.format(**locals())) |
| except plpy.SPIError as e: |
| # The specific exception we want to catch here is |
| # "spiexceptions.DuplicateColumn", but not all platforms have it |
| # defined. So catching a more generic exception and displaying this |
| # warning message. |
| with MinWarning("warning"): |
| plpy.warning("A plausible error condition: the output_cols" |
| "parameter might be an invalid SELECT expression, " |
| "resulting in duplicate column names.") |
| raise |
| |
| |
| def _validate(source_table, output_table, partition_expr, time_stamp, max_time): |
| input_tbl_valid(source_table, 'Sessionization') |
| output_tbl_valid(output_table, 'Sessionization') |
| # ensure the expressions are not None or empty strings |
| _assert(partition_expr, "Sessionization error: Invalid partition expression") |
| _assert(time_stamp, "Sessionization error: Invalid time stamp column") |
| _assert(max_time, "Sessionization error: Invalid max time value") |
| # ensure the partition/order expression can actually be used |
| _assert(is_var_valid(source_table, partition_expr, time_stamp), |
| "Sessionization error: Invalid partition expression or time stamp column name") |
| |
| |
| def sessionize_help_message(schema_madlib, message, **kwargs): |
| """ |
| Help message for sessionize function |
| """ |
| summary_string = """ |
| ----------------------------------------------------------------------------------- |
| SUMMARY |
| ----------------------------------------------------------------------------------- |
| Functionality: Sessionize |
| |
| The MADlib sessionize function performs time-oriented session reconstruction on a |
| data set comprising a sequence of events. A defined period of inactivity indicates |
| the end of one session and beginning of the next session. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.sessionize('usage'); |
| """.format(schema_madlib=schema_madlib) |
| |
| usage_string = """ |
| ----------------------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------------------- |
| SELECT {schema_madlib}.sessionize( |
| 'source_table', -- str, Name of the source table that contains the data to |
| -- be sessionized |
| 'output_table', -- str, Name of the output view or table |
| 'partition_expr', -- str, Partition expression to group the data table |
| 'time_stamp' -- str, The time stamp column name that is used for |
| -- sessionization calculation |
| 'max_time' -- interval, Delta time between subsequent events to define |
| -- a session |
| 'output_cols' -- str, An optional valid postgres SELECT expression for the |
| -- output table/view (default *) |
| 'create_view' -- boolean, Optional parameter to specify if output is a |
| -- view or materilized to a table (default True) |
| ); |
| """.format(schema_madlib=schema_madlib) |
| |
| help_string = summary_string |
| |
| if not message: |
| return summary_string |
| elif message.lower() in ('usage', 'help', '?'): |
| return usage_string |
| else: |
| return """ |
| No such option. Use "SELECT {schema_madlib}.sessionize()" for help. |
| """.format(schema_madlib=schema_madlib) |