blob: 0d593e60b9a4d3651a982f031c137b480d6ad0f1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy
import string
import re
from control import MinWarning
from utilities import unique_string, _assert, split_quoted_delimited_str
from validate_args import get_cols
from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid
m4_changequote(`<!', `!>')
def sessionize(schema_madlib, source_table, output_table, partition_expr,
time_stamp, max_time, output_cols, create_view, **kwargs):
"""
Perform sessionization over a sequence of rows.
Args:
@param schema_madlib: str, Name of the MADlib schema
@param source_table: str, Name of the input table/view
@param output_table: str, Name of the table to store result
@param partition_expr: str, Expression to partition (group) the input data
@param time_stamp: str, The time stamp column name that is used for sessionization calculation
@param max_time: interval, Delta time between subsequent events to define a session
@param output_cols: str, a valid postgres SELECT expression (default '*')
@param create_view: boolean, indicates if the output is a view or a table with name
specified by output_table (default TRUE):
TRUE - create view
FALSE - materialize results into a table
"""
with MinWarning("error"):
if not partition_expr:
partition_expr = "1 = 1"
_validate(source_table, output_table, partition_expr, time_stamp, max_time)
# Checking for 'None' too since create_view and output_cols could be
# explicitly set to NULL by the user.
table_or_view = 'VIEW' if create_view or create_view is None else 'TABLE'
# Empty string ('') for output_cols is also set to default value '*'
output_cols = '*' if not output_cols or output_cols is None else output_cols
# If the output_cols has '*' as one of the elements, expand it to
# include all columns in the source table. The following list
# comprehension is only to handle the case where '*' is included
# in output_cols. Using '*' as is, without expanding it to specific
# column names leads to some temporary intermediate columns
# (new_partition and new_session defined below) occurring in the output.
cols_to_project_list = [', '.join(get_cols(source_table)) if i=='*' else i
for i in split_quoted_delimited_str(output_cols)]
# Examples of Invalid SELECT expression in output_cols:
# 1) If output_cols contains '*' along with an existing column name
# in the source table, postgres will throw an error and fail
# for specifying duplicate column names in the output table/view.
# 2) If output_cols contains more than 1 expressions which are not
# renamed using ' AS ', postgres will fail since it will try to
# rename all such new columns as '?column?'. This is considered an
# invalid SELECT expression.
cols_to_project = ', '.join(cols_to_project_list)
session_id = 'session_id' if not is_var_valid(source_table, 'session_id')\
else unique_string('session_id')
# Create temp column names for intermediate columns.
new_partition = unique_string('new_partition')
new_session = unique_string('new_session')
try:
plpy.execute("""
CREATE {table_or_view} {output_table} AS
SELECT
{cols_to_project},
CASE WHEN {time_stamp} IS NOT NULL
THEN SUM(CASE WHEN {new_partition} OR {new_session} THEN 1 END)
OVER (PARTITION BY {partition_expr} ORDER BY {time_stamp})
END AS {session_id}
FROM (
SELECT *,
ROW_NUMBER() OVER (w) = 1 AND {time_stamp} IS NOT NULL AS {new_partition},
({time_stamp}-LAG({time_stamp}, 1) OVER (w)) > '{max_time}'::INTERVAL AS {new_session}
FROM {source_table} WINDOW w AS (PARTITION BY {partition_expr} ORDER BY {time_stamp})
) a
""".format(**locals()))
except plpy.SPIError as e:
# The specific exception we want to catch here is
# "spiexceptions.DuplicateColumn", but not all platforms have it
# defined. So catching a more generic exception and displaying this
# warning message.
with MinWarning("warning"):
plpy.warning("A plausible error condition: the output_cols"
"parameter might be an invalid SELECT expression, "
"resulting in duplicate column names.")
raise
def _validate(source_table, output_table, partition_expr, time_stamp, max_time):
input_tbl_valid(source_table, 'Sessionization')
output_tbl_valid(output_table, 'Sessionization')
# ensure the expressions are not None or empty strings
_assert(partition_expr, "Sessionization error: Invalid partition expression")
_assert(time_stamp, "Sessionization error: Invalid time stamp column")
_assert(max_time, "Sessionization error: Invalid max time value")
# ensure the partition/order expression can actually be used
_assert(is_var_valid(source_table, partition_expr, time_stamp),
"Sessionization error: Invalid partition expression or time stamp column name")
def sessionize_help_message(schema_madlib, message, **kwargs):
"""
Help message for sessionize function
"""
summary_string = """
-----------------------------------------------------------------------------------
SUMMARY
-----------------------------------------------------------------------------------
Functionality: Sessionize
The MADlib sessionize function performs time-oriented session reconstruction on a
data set comprising a sequence of events. A defined period of inactivity indicates
the end of one session and beginning of the next session.
For more details on function usage:
SELECT {schema_madlib}.sessionize('usage');
""".format(schema_madlib=schema_madlib)
usage_string = """
-----------------------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------------------
SELECT {schema_madlib}.sessionize(
'source_table', -- str, Name of the source table that contains the data to
-- be sessionized
'output_table', -- str, Name of the output view or table
'partition_expr', -- str, Partition expression to group the data table
'time_stamp' -- str, The time stamp column name that is used for
-- sessionization calculation
'max_time' -- interval, Delta time between subsequent events to define
-- a session
'output_cols' -- str, An optional valid postgres SELECT expression for the
-- output table/view (default *)
'create_view' -- boolean, Optional parameter to specify if output is a
-- view or materilized to a table (default True)
);
""".format(schema_madlib=schema_madlib)
help_string = summary_string
if not message:
return summary_string
elif message.lower() in ('usage', 'help', '?'):
return usage_string
else:
return """
No such option. Use "SELECT {schema_madlib}.sessionize()" for help.
""".format(schema_madlib=schema_madlib)