blob: 1d7d8314fd44b17e0660f7389634f8ac3dcba0c3 [file] [log] [blame]
# coding=utf-8
"""
@file assoc_rules.py_in
@brief Association Rules - Apriori Algorithm Implementation.
@namespace assoc_rules
"""
import time
import plpy
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
from utilities.validate_args import output_tbl_valid
from utilities.utilities import unique_string
from utilities.control import MinWarning
"""
@brief if the given condition is false, then raise an error with the message
@param condition the condition to be assert
@param msg the error message to be reported
"""
def __assert(condition, msg):
if not condition :
plpy.error(msg);
"""
@brief test if the 1st value is greater than the 2nd value
@param val1 a float number
@param val2 a float number
@return true if the 1st value greater than the 2nd value.
Otherwise return false
"""
def __float_gt(val1, val2):
return val1 - val2 > 1E-14;
"""
@brief test if the 1st value is less than or equal to the 2nd value
@param val1 a float number
@param val2 a float number
@return true if the 1st value is less than or equal to the 2nd value.
Otherwise return false
"""
def __float_le(val1, val2):
return (val1 - val2) < -1E-14 or abs(val1 - val2) < 1E-14;
"""
@brief The entry function for the Apriori.
@param support minimum level of support needed for each itemset
to be included in result
@param confidence minimum level of confidence needed for each rule
to be included in result
@param tid_col name of the column storing the transaction ids
@param item_col name of the column storing the products
@param input_table name of the table where the data is stored
@param output_schema name of the schema where the final results will be stored
@param verbose determining if output contains comments
@param max_itemset_size determines the maximum size of frequent itemsets allowed
to generate association rules from
@param max_lhs_size determines the maximum size of the lhs of the rule
@param max_rhs_size determines the maximum size of the rhs of the rule
"""
@MinWarning("warning")
def assoc_rules(madlib_schema, support, confidence, tid_col,
item_col, input_table, output_schema, verbose,
max_itemset_size, max_lhs_size, max_rhs_size):
begin_func_exec = time.time();
begin_step_exec = time.time();
cal_itemsets_time = 0;
if max_itemset_size is None:
max_itemset_size = 10
elif max_itemset_size <= 1:
plpy.error("ERROR: max_itemset_size has to be greater than 1.")
#Validate LHS RHS
__assert(max_lhs_size is None or max_lhs_size > 0,
"max_lhs_size param must be a positive number.")
__assert(max_rhs_size is None or max_rhs_size > 0,
"max_rhs_size param must be a positive number.")
#check parameters
__assert(
support is not None and
confidence is not None and
tid_col is not None and
item_col is not None and
input_table is not None and
verbose is not None,
"none of the parameters except the output_schema should be null"
);
__assert(
__float_gt(support, 0) and __float_le(support, 1),
"the parameter support must be in range of (0, 1]"
);
__assert(
__float_gt(confidence, 0) and __float_le(confidence, 1),
"the parameter confidence must be in range of (0, 1]"
);
__assert(len(input_table) > 0 and table_exists(input_table, madlib_schema) and
len(tid_col) > 0 and len(item_col) > 0 and
columns_exist_in_table(input_table, [tid_col, item_col], madlib_schema),
"the input table must exist and the tid_col and "
"item_col must be in the input table")
if output_schema is None :
rv = plpy.execute("SELECT current_schema() as s");
output_schema = rv[0]["s"];
else :
rv = plpy.execute("""
SELECT count(*) as cnt
FROM pg_namespace
where quote_ident(nspname) = '{0}'
""".format(output_schema)
);
__assert(rv[0]["cnt"] == 1, "the output schema does not exist");
if verbose:
plpy.info("finished checking parameters");
# create the result table for keeping the generated rules
output_tbl_valid("{0}.assoc_rules".format(output_schema), 'assoc rules')
plpy.execute("""
CREATE TABLE {0}.assoc_rules
(
ruleId INT,
pre TEXT[],
post TEXT[],
count INT,
support FLOAT8,
confidence FLOAT8,
lift FLOAT8,
conviction FLOAT8
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)')""".format(output_schema)
);
# the auxiliary table for keeping the association rules
assoc_rules_aux_tmp = unique_string('assoc_rules_aux_tmp')
plpy.execute("""
CREATE TEMP TABLE {0}
(
ruleId SERIAL,
pre TEXT,
post TEXT,
support FLOAT8,
confidence FLOAT8,
lift FLOAT8,
conviction FLOAT8
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)')
""".format(assoc_rules_aux_tmp));
# if the tid in the input table doesn't start with 1 and the IDs are not
# continuous, then we will make the IDs start with 1 and continuous.
# note: duplicated records will be removed.
rv = plpy.execute("""
SELECT max({0}) as x, min({1}) as i, count({2}) as c
FROM {3}""".format(tid_col, tid_col, tid_col, input_table));
if rv[0]["i"] != 1 or rv[0]["x"] - rv[0]["i"] != rv[0]["c"] :
assoc_input_unique = unique_string('assoc_input_unique')
plpy.execute("""
CREATE TEMP TABLE {5} AS
SELECT
dense_rank() OVER (ORDER BY tid)::BIGINT as tid,
item
FROM (
SELECT {0} as tid,
{1}::text as item
FROM {2}
WHERE {3} IS NOT NULL AND
{4} IS NOT NULL
GROUP BY 1,2
) t
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (tid)')
""".format(tid_col, item_col, input_table, tid_col, item_col,
assoc_input_unique)
);
if verbose :
plpy.info("finished removing duplicates");
rv = plpy.execute("""
SELECT count(DISTINCT tid) as c1, count(DISTINCT item) as c2
FROM {assoc_input_unique}
""".format(**locals()));
num_tranx = rv[0]["c1"];
num_prod = rv[0]["c2"];
min_supp_tranx = float(num_tranx) * support;
# Set default values to the max possible number that an itemset can
# have on its LHS or RHS for a given input dataset.
max_lhs_size = num_prod if max_lhs_size is None else max_lhs_size
max_rhs_size = num_prod if max_rhs_size is None else max_rhs_size
# get the items whose counts are greater than the given
# support counts. Each item will be given a continuous number.
assoc_item_uniq = unique_string('assoc_item_uniq')
plpy.execute("""
CREATE TEMP TABLE {assoc_item_uniq} (item_id, item_text, cnt) AS
SELECT row_number() OVER (ORDER BY cnt DESC)::BIGINT,
item::TEXT,
cnt::FLOAT8
FROM (
SELECT item AS item, count(tid) as cnt
FROM {assoc_input_unique}
GROUP BY item
) t
WHERE cnt::FLOAT8 >= {min_supp_tranx}
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (item_id)')
""".format(**locals()));
if verbose :
plpy.info("finished encoding items");
rv = plpy.execute("SELECT count(item_id) as c FROM {assoc_item_uniq}".format(**locals()));
num_supp_prod = rv[0]["c"];
assoc_enc_input = unique_string('assoc_enc_input')
plpy.execute("""
CREATE TEMP TABLE {assoc_enc_input} (tid, item, cnt) AS
SELECT t1.tid, t2.item_id, t2.cnt
FROM {assoc_input_unique} t1, {assoc_item_uniq} t2
WHERE t1.item = t2.item_text
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (tid)')
""".format(**locals()));
if verbose :
plpy.info("finished encoding input table: {0}".format(
time.time() - begin_step_exec));
plpy.info("Beginning iteration #1");
begin_step_exec = time.time();
# this table keeps the patterns in each iteration.
assoc_rule_sets = unique_string('assoc_rule_sets')
plpy.execute("""
CREATE TEMP TABLE {assoc_rule_sets}
(
text_svec TEXT,
set_list {madlib_schema}.svec,
support FLOAT8,
iteration INT
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (text_svec)')
""".format(**locals()));
# this table keeps the patterns in the current iteration.
assoc_rule_sets_loop = unique_string('assoc_rule_sets_loop')
plpy.execute("""
CREATE TEMP TABLE {assoc_rule_sets_loop}
(
id BIGINT,
set_list {madlib_schema}.svec,
support FLOAT8,
tids {madlib_schema}.svec
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (id)')
""".format(**locals()));
# this table adds a new column to order the table by its setlist
assoc_rule_sets_loop_ordered = unique_string('assoc_rule_sets_loop_ordered')
plpy.execute("""
CREATE TEMP TABLE {assoc_rule_sets_loop_ordered}
(
id BIGINT,
set_list {madlib_schema}.svec,
support FLOAT8,
tids {madlib_schema}.svec,
newrownum BIGINT
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (newrownum)')
""".format(**locals()));
plpy.execute("""
INSERT INTO {assoc_rule_sets_loop} (id, set_list, support, tids)
SELECT
t.item,
{madlib_schema}.svec_cast_positions_float8arr
(ARRAY[t.item], ARRAY[1], {num_supp_prod}, 0),
t.cnt::FLOAT8 / {num_tranx},
{madlib_schema}.svec_cast_positions_float8arr
(array_agg(t.tid), array_agg(1), {num_tranx}, 0)
FROM {assoc_enc_input} t
GROUP BY t.item, t.cnt
""".format(**locals()));
plpy.execute("""
INSERT INTO {assoc_rule_sets_loop_ordered}(id, set_list, support,
tids, newrownum)
SELECT *, row_number() over (order by set_list) as newrownum
FROM {assoc_rule_sets_loop}
""".format(**locals()));
rv = plpy.execute("""
SELECT count(id) as c1, max(id) as c2
FROM {assoc_rule_sets_loop}
""".format(**locals()));
num_item_loop = rv[0]["c1"];
max_item_loop = rv[0]["c2"];
__assert(
(num_item_loop == max_item_loop) or
(num_item_loop == 0 and max_item_loop is None),
"internal error: num_item_loop must be equal to max_item_loop"
);
# As two different svecs may have the same hash key,
# we use this table to assign a unique ID for each svec.
assoc_loop_aux = unique_string('al_aux')
plpy.execute("""
CREATE TEMP TABLE {assoc_loop_aux}
(
id BIGSERIAL,
set_list {madlib_schema}.svec,
support FLOAT8,
tids {madlib_schema}.svec
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (id)')
""".format(**locals()));
if verbose :
plpy.info("{0} Frequent itemsets found in this iteration".format(
num_supp_prod));
plpy.info("Completed iteration # 1. Time: {0}".format(
time.time() - begin_step_exec));
curr_iter = 0;
num_products_threshold = num_supp_prod
while num_item_loop > 0 and curr_iter < max_itemset_size:
begin_step_exec = time.time();
curr_iter = curr_iter + 1;
if verbose :
plpy.info("Beginning iteration # {0}".format(curr_iter + 1));
plpy.execute("""
INSERT INTO {assoc_rule_sets}
(text_svec, set_list, support, iteration)
SELECT array_to_string(
{madlib_schema}.svec_nonbase_positions(set_list, 0), ','),
set_list,
support, {curr_iter}
FROM {assoc_rule_sets_loop}""".format(**locals()));
if verbose :
plpy.info("time of preparing data: {0}".format(
time.time() - begin_step_exec));
# generate the patterns for the next iteration
# The vector operations are used to calculate the boolean OR operation
# efficiently.
# Example:
# a1 = [1,1,0,0,0,1] a2 = [1,1,0,0,1,1]
# svec_a1 = {2,3,1}:{1,0,1} svec_a2 = {2,2,2}:{1,0,1}
# x = svec_a1 + svec_a2 = [2,2,0,0,1,2] -> {2,2,1,1}:{2,0,1,2}
# y = svec_a1 * svec_a2 = [1,1,0,0,0,1] -> {2,3,1}:{1,0,1}
# x - y = [1,1,0,0,1,1] -> {2,2,2}:{1,0,1}
# The boolean OR operation is used to combine two itemsets to create a larger one.
# (a,b,c) combined with (a,b,d) will give (a,b,c,d)
# (a,b,c) combined with (c,d,e) will give (a,b,c,d,e)
# The {assoc_rule_sets_loop_ordered} table is ordered by the setlist.
# This ensures that the sets (a,b,c) and (a,b,d) are close to each other.
# t3.newrownum-t1.newrownum <= {num_products_threshold} check ensures that we are
# looking at a subset of pairs of itemsets instead of all pairs.
# Let's assume we are trying to get to (a,b,c,d)
# If the support for (a,b,c,d) is above the threshold; (a,b,c), (a,b,d),
# (b,c,d) should all be already in our list.
# In this case it doesn't matter if we skip trying (a,b,c) and (b,c,d)
# combination because the result is already covered by (a,b,c) + (a,c,d).
# The iterp1 check ensures that the new itemset is of a certain size.
# At every iteration, we increase the target itemset size by one.
plpy.execute("ALTER SEQUENCE {assoc_loop_aux}_id_seq RESTART WITH 1".format(**locals()));
plpy.execute("TRUNCATE TABLE {assoc_loop_aux}".format(**locals()));
plpy.execute("""
INSERT INTO {assoc_loop_aux}(set_list, support, tids)
SELECT DISTINCT ON({madlib_schema}.svec_to_string(set_list)) set_list,
{madlib_schema}.svec_l1norm(tids)::FLOAT8 / {num_tranx},
tids
FROM (
SELECT
{madlib_schema}.svec_minus(
{madlib_schema}.svec_plus(t1.set_list, t3.set_list),
{madlib_schema}.svec_mult(t1.set_list, t3.set_list)
) as set_list,
{madlib_schema}.svec_mult(t1.tids, t3.tids) as tids
FROM {assoc_rule_sets_loop_ordered} t1,
{assoc_rule_sets_loop_ordered} t3
WHERE t1.newrownum < t3.newrownum AND
t3.newrownum-t1.newrownum <= {num_products_threshold}
) t
WHERE {madlib_schema}.svec_l1norm(set_list)::INT = {iterp1} AND
{madlib_schema}.svec_l1norm(tids)::FLOAT8 >= {min_supp_tranx}
""".format(iterp1=curr_iter + 1, **locals())
);
plpy.execute("TRUNCATE TABLE {assoc_rule_sets_loop}".format(**locals()));
plpy.execute("""
INSERT INTO {assoc_rule_sets_loop}(id, set_list, support, tids)
SELECT * FROM {assoc_loop_aux}
""".format(**locals()));
plpy.execute("TRUNCATE TABLE {assoc_rule_sets_loop_ordered}".format(**locals()));
plpy.execute("""
INSERT INTO {assoc_rule_sets_loop_ordered}(id, set_list, support, tids, newrownum)
SELECT *, row_number() over (order by set_list) as newrownum
FROM {assoc_rule_sets_loop}
""".format(**locals()));
rv = plpy.execute("""
SELECT count(id) as c1, max(id) as c2
FROM {assoc_rule_sets_loop}
""".format(**locals()));
num_item_loop = rv[0]["c1"];
max_item_loop = rv[0]["c2"];
__assert(
(num_item_loop == max_item_loop) or
(num_item_loop == 0 and max_item_loop is None),
"internal error: num_item_loop must be equal to max_item_loop"
);
# num_products_threshold should be equal to the number of distinct
# products that are present in the previous iteration's frequent
# itemsets.
# We can ideally do away with the if condition, but it's a trade-off:
# The query in the if condition might be a considerable overhead when
# the original number of products is very low compared to the number
# of rows in {assoc_rule_sets_loop} in a given iteration. On the other
# hand, it can be a considerable improvement if we have a lot of
# distinct products and only a small number of them are present in
# frequent itemsets. But this is specific to datasets and parameters
# (such as support), so the following if statment is a compromise.
if num_item_loop < num_supp_prod:
# Get number of 1's from all set_lists in {assoc_rule_sets_loop}
num_products_threshold = plpy.execute("""
SELECT {madlib_schema}.svec_l1norm(
{madlib_schema}.svec_count_nonzero(a)) AS cnt
FROM (
SELECT {madlib_schema}.svec_sum(set_list) AS a
FROM {assoc_rule_sets_loop}
) t
""".format(**locals()))[0]['cnt']
if verbose :
plpy.info("{0} Frequent itemsets found in this iteration".format(
num_item_loop));
plpy.info("Completed iteration # {0}. Time: {1}".format(
curr_iter + 1, time.time() - begin_step_exec));
cal_itemsets_time = cal_itemsets_time + (time.time() - begin_step_exec);
if (num_item_loop == 0) and (curr_iter < 2) and verbose :
plpy.info("No association rules found that meet given criteria");
plpy.info("finished itemsets finding. Time: {0}".format(
cal_itemsets_time));
total_rules = 0;
else :
if (verbose) :
plpy.info("begin to generate the final rules");
begin_step_exec = time.time();
# generate all the final rules
plpy.execute("""
INSERT INTO {assoc_rules_aux_tmp}
(pre, post, support, confidence, lift, conviction)
SELECT
t.item[1],
t.item[2],
t.support_xy,
t.support_xy / x.support,
t.support_xy / (x.support * y.support),
CASE WHEN abs(t.support_xy / x.support - 1) < 1.0E-10 THEN
0
ELSE
(1 - y.support) / (1 - (t.support_xy / x.support))
END
FROM (
SELECT
{madlib_schema}.gen_rules_from_cfp(text_svec,
iteration,
{max_lhs_size},
{max_rhs_size}) as item,
support as support_xy
FROM {assoc_rule_sets}
WHERE iteration > 1
) t, {assoc_rule_sets} x, {assoc_rule_sets} y
WHERE t.item[1] = x.text_svec AND
t.item[2] = y.text_svec AND
(t.support_xy / x.support) >= {confidence}
""".format(**locals()));
# generate the readable rules
pre_tmp_table = unique_string('pre_tmp_table')
plpy.execute("""
CREATE TEMP TABLE {pre_tmp_table} AS
SELECT ruleId, array_agg(item_text) as pre
FROM
(
SELECT
ruleId,
unnest(string_to_array(pre, ','))::BIGINT as pre_id
FROM {assoc_rules_aux_tmp}
) s1, {assoc_item_uniq} s2
WHERE s1.pre_id = s2.item_id
GROUP BY ruleId
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)')
""".format(**locals()));
post_tmp_table = unique_string('post_tmp_table')
plpy.execute("""
CREATE TEMP TABLE {post_tmp_table} AS
SELECT ruleId, array_agg(item_text) as post
FROM
(
SELECT
ruleId,
unnest(string_to_array(post, ','))::BIGINT as post_id
FROM {assoc_rules_aux_tmp}
) s1, {assoc_item_uniq} s2
WHERE s1.post_id = s2.item_id
GROUP BY ruleId
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)')
""".format(**locals()));
plpy.execute("""
INSERT INTO {output_schema}.assoc_rules
SELECT t1.ruleId, t2.pre, t3.post, t1.support*{num_tranx}::INT AS count, support,
confidence, lift, conviction
FROM
{assoc_rules_aux_tmp} t1,
{pre_tmp_table} t2,
{post_tmp_table} t3
WHERE t1.ruleId = t2.ruleId AND t1.ruleId = t3.ruleId
""".format(**locals()));
# if in verbose mode, we will keep all the intermediate tables
if not verbose :
plpy.execute("""
DROP TABLE IF EXISTS {post_tmp_table};
DROP TABLE IF EXISTS {pre_tmp_table};
DROP TABLE IF EXISTS {assoc_rules_aux_tmp};
DROP TABLE IF EXISTS {assoc_input_unique};
DROP TABLE IF EXISTS {assoc_item_uniq};
DROP TABLE IF EXISTS {assoc_enc_input};
DROP TABLE IF EXISTS {assoc_rule_sets};
DROP TABLE IF EXISTS {assoc_rule_sets_loop};
DROP TABLE IF EXISTS {assoc_rule_sets_loop_ordered};
DROP TABLE IF EXISTS {assoc_loop_aux};
""".format(**locals()));
if verbose :
rv = plpy.execute("""
SELECT count(*) as c FROM {0}.assoc_rules
""".format(output_schema));
num_item_loop = rv[0]["c"];
plpy.info("{0} Total association rules found. Time: {1}".format(
num_item_loop, time.time() - begin_step_exec));
rv = plpy.execute("""
SELECT count(*) as c FROM {0}.assoc_rules
""".format(output_schema));
total_rules = rv[0]["c"];
return (
output_schema,
'assoc_rules',
total_rules,
time.time() - begin_func_exec
);
def assoc_rules_help_message(schema_madlib, message=None, **kwargs):
"""
Given a help string, provide usage information
Args:
@param schema_madlib Name of the MADlib schema
@param message Helper message to print
Returns:
None
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.assoc_rules(
support, -- FLOAT8, minimum level of support needed for each
itemset to be included in result
confidence, -- FLOAT8, minimum level of confidence needed for each
rule to be included in result
tid_col, -- TEXT, name of the column storing the transaction ids
item_col, -- TEXT, name of the column storing the products
input_table, -- TEXT, name of the table containing the input data
output_schema, -- TEXT, name of the schema where the final results
will be stored. The schema must be created
before calling the function.
Alternatively, use <tt>NULL</tt> to output
to the current schema.
verbose, -- BOOLEAN, (optional, default: False) determines if
details are printed for each iteration as
the algorithm progresses
max_itemset_size, -- INTEGER, (optional, default: itemsets of all sizes)
determines the maximum size of frequent
itemsets allowed that are used for generating
association rules. Value less than 2 throws an error.
max_lhs_size, -- INTEGER, (optional, default: NULL) determines the
maximum size of the lhs of the rule.
NULL means there is no restriction on the
size of the left hand side.
max_rhs_size -- INTEGER (optional, default: NULL) determines the
maximum size of the rhs of the rule.
NULL means there is no restriction on the
size of the right hand side.
);
-------------------------------------------------------------------------
OUTPUT TABLES
-------------------------------------------------------------------------
The output table "assoc_rules" in the "output_schema" contains a unique rule of the form "If X, then Y
(i.e., X => Y)" in each row. X and Y are non-empty itemsets, called the antecedent and consequent, or
the left-hand-side (LHS) and right-hand-side (LHS), of the rule respectively.
in each row, with the following columns:
ruleid, -- INTEGER, row number
pre, -- TEXT, specifies the antecedent, or the LHS of the rule
post, -- DOUBLE, specifies the consequent, or the RHS of the rule
support, -- DOUBLE, support of the frequent itemset X,Y
count, -- INTEGER, number of transactions in the input table that contain X,Y
confidence, -- DOUBLE, the ratio of number of transactions that contain X,Y to the number of transactions
that contain X
lift, -- DOUBLE, the ratio of observed support of X,Y to the expected support of X,Y, assuming X and
Y are independent.
conviction -- DOUBLE, the ratio of expected support of X occurring without Y assuming X and Y are
independent, to the observed support of X occuring without Y
""".format(schema_madlib=schema_madlib)
else:
return """
For an overview on usage, run: SELECT {schema_madlib}.assoc_rules('usage');
""".format(schema_madlib=schema_madlib)