# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy
import string
import re
from control import MinWarning
from utilities import unique_string, _assert, split_quoted_delimited_str
from validate_args import get_cols
from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid
def sessionize(schema_madlib, source_table, output_table, partition_expr,
time_stamp, max_time, output_cols, create_view, **kwargs):
Perform sessionization over a sequence of rows.
@param schema_madlib: str, Name of the MADlib schema
@param source_table: str, Name of the input table/view
@param output_table: str, Name of the table to store result
@param partition_expr: str, Expression to partition (group) the input data
@param time_stamp: str, The time stamp column name that is used for sessionization calculation
@param max_time: interval, Delta time between subsequent events to define a session
@param output_cols: str, a valid postgres SELECT expression (default '*')
@param create_view: boolean, indicates if the output is a view or a table with name
specified by output_table (default TRUE):
TRUE - create view
FALSE - materialize results into a table
with MinWarning("error"):
if not partition_expr:
partition_expr = "1 = 1"
_validate(source_table, output_table, partition_expr, time_stamp, max_time)
# Checking for 'None' too since create_view and output_cols could be
# explicitly set to NULL by the user.
table_or_view = 'VIEW' if create_view or create_view is None else 'TABLE'
# Empty string ('') for output_cols is also set to default value '*'
output_cols = '*' if not output_cols or output_cols is None else output_cols
# If the output_cols has '*' as one of the elements, expand it to
# include all columns in the source table. The following list
# comprehension is only to handle the case where '*' is included
# in output_cols. Using '*' as is, without expanding it to specific
# column names leads to some temporary intermediate columns
# (new_partition and new_session defined below) occurring in the output.
cols_to_project_list = [', '.join(get_cols(source_table, schema_madlib)) if i=='*' else i
for i in split_quoted_delimited_str(output_cols)]
# Examples of Invalid SELECT expression in output_cols:
# 1) If output_cols contains '*' along with an existing column name
# in the source table, postgres will throw an error and fail
# for specifying duplicate column names in the output table/view.
# 2) If output_cols contains more than 1 expressions which are not
# renamed using ' AS ', postgres will fail since it will try to
# rename all such new columns as '?column?'. This is considered an
# invalid SELECT expression.
cols_to_project = ', '.join(cols_to_project_list)
session_id = 'session_id' if not is_var_valid(source_table, 'session_id')\
else unique_string('session_id')
# Create temp column names for intermediate columns.
new_partition = unique_string('new_partition')
new_session = unique_string('new_session')
CREATE {table_or_view} {output_table} AS
CASE WHEN {time_stamp} IS NOT NULL
THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END)
OVER (PARTITION BY {partition_expr} ORDER BY {time_stamp})
END AS {session_id}
ROW_NUMBER() OVER (w) = 1 AND {time_stamp} IS NOT NULL AS {new_partition},
({time_stamp}-LAG({time_stamp}, 1) OVER (w)) > '{max_time}'::INTERVAL AS {new_session}
FROM {source_table} WINDOW w AS (PARTITION BY {partition_expr} ORDER BY {time_stamp})
) a
except plpy.SPIError as e:
# The specific exception we want to catch here is
# "spiexceptions.DuplicateColumn", but not all platforms have it
# defined. So catching a more generic exception and displaying this
# warning message.
with MinWarning("warning"):
plpy.warning("A plausible error condition: the output_cols"
"parameter might be an invalid SELECT expression, "
"resulting in duplicate column names.")
def _validate(source_table, output_table, partition_expr, time_stamp, max_time):
input_tbl_valid(source_table, 'Sessionization')
output_tbl_valid(output_table, 'Sessionization')
# ensure the expressions are not None or empty strings
_assert(partition_expr, "Sessionization error: Invalid partition expression")
_assert(time_stamp, "Sessionization error: Invalid time stamp column")
_assert(max_time, "Sessionization error: Invalid max time value")
# ensure the partition/order expression can actually be used
_assert(is_var_valid(source_table, partition_expr, time_stamp),
"Sessionization error: Invalid partition expression or time stamp column name")
def sessionize_help_message(schema_madlib, message, **kwargs):
Help message for sessionize function
summary_string = """
Functionality: Sessionize
The MADlib sessionize function performs time-oriented session reconstruction on a
data set comprising a sequence of events. A defined period of inactivity indicates
the end of one session and beginning of the next session.
For more details on function usage:
SELECT {schema_madlib}.sessionize('usage');
For a small example on using the function:
SELECT {schema_madlib}.sessionize('example');
usage_string = """
SELECT {schema_madlib}.sessionize(
'source_table', -- str, Name of the source table that contains the data to
-- be sessionized
'output_table', -- str, Name of the output view or table
'partition_expr', -- str, Partition expression to group the data table
'time_stamp' -- str, The time stamp column name that is used for
-- sessionization calculation
'max_time' -- interval, Delta time between subsequent events to define
-- a session
'output_cols' -- str, An optional valid postgres SELECT expression for the
-- output table/view (default *)
'create_view' -- boolean, Optional parameter to specify if output is a
-- view or materilized to a table (default True)
example_string = """
- Create an input data set:
CREATE TABLE eventlog (event_timestamp TIMESTAMP,
user_id INT,
page TEXT,
revenue FLOAT);
('04/15/2015 02:19:00', 101331, 'CHECKOUT', 16),
('04/15/2015 02:17:00', 202201, 'WINE', 0),
('04/15/2015 03:18:00', 202201, 'BEER', 0),
('04/15/2015 01:03:00', 100821, 'LANDING', 0),
('04/15/2015 01:04:00', 100821, 'WINE', 0),
('04/15/2015 01:05:00', 100821, 'CHECKOUT', 39),
('04/15/2015 02:06:00', 100821, 'WINE', 0),
('04/15/2015 02:09:00', 100821, 'WINE', 0),
('04/15/2015 02:15:00', 101331, 'LANDING', 0),
('04/15/2015 02:16:00', 101331, 'WINE', 0),
('04/15/2015 02:17:00', 101331, 'HELP', 0),
('04/15/2015 02:18:00', 101331, 'WINE', 0),
('04/15/2015 02:29:00', 201881, 'LANDING', 0),
('04/15/2015 02:30:00', 201881, 'BEER', 0),
('04/15/2015 01:05:00', 202201, 'LANDING', 0),
('04/15/2015 01:06:00', 202201, 'HELP', 0),
('04/15/2015 01:09:00', 202201, 'LANDING', 0),
('04/15/2015 02:15:00', 202201, 'WINE', 0),
('04/15/2015 02:16:00', 202201, 'BEER', 0),
('04/15/2015 03:19:00', 202201, 'WINE', 0),
('04/15/2015 03:22:00', 202201, 'CHECKOUT', 21);
- Sessionize the table for each user_id, and obtain only the user_id, with partition
expression, event_timestamp and session_id:
SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'0:30:0' -- Use 30 minute time out to define sessions
- View the output table containing the session IDs:
SELECT * FROM sessionize_output;
DROP VIEW sessionize_output;
- Sessionize the table for each user_id, and materialize all columns from
source table into an output table:
SELECT {schema_madlib}.sessionize(
'eventlog', -- Name of input table
'sessionize_output', -- Table name to store sessionized results
'user_id < 200000', -- Partition input table by session
'event_timestamp', -- Order partitions in input table by time
'180', -- Use 3 minutes (180 seconds) to define sessions
'event_timestamp, user_id, user_id < 200000 AS "Department-A1"',
-- Select only the required columns, along with the
-- session id column that is selected by default
'false' -- Materialize results into a table, and not a view
- View the output table containing the session IDs:
SELECT * FROM sessionize_output WHERE "Department-A1"='TRUE';
DROP TABLE sessionize_output;
help_string = summary_string
if not message:
return summary_string
elif message.lower() in ('usage', 'help', '?'):
return usage_string
elif message.lower() == 'example':
return example_string
return """
No such option. Use "SELECT {schema_madlib}.sessionize()" for help.