blob: b97929f9cda1e69432c1ad245066a8d7c9b613e1 [file] [log] [blame]
# coding=utf-8
"""@file bayes.py_in
@brief Naive Bayes classification with user-defined smoothing factor (default:
Laplacian smoothing).
@namespace bayes
Naive Bayes: Setup Functions
@internal
@implementation
For the Naive Bayes Classification, we need a product over probabilities.
However, multiplying lots of small numbers can lead to an exponent overflow.
E.g., multiplying more than 324 numbers at most 0.1 will yield a product of 0
in machine arithmetic. A safer way is therefore summing logarithms.
By the IEEE 754 standard, the smallest number representable as
DOUBLE PRECISION (64bit) is $2^{-1022}$, i.e., approximately 2.225e-308.
See, e.g., http://en.wikipedia.org/wiki/Double_precision
Hence, log(x) = log_10(x) for any non-zero DOUBLE PRECISION @f$x \ge -308@f$.
Note for theorists:
- Even adding infinitely many \f$ \log_{10}(x)@f$ for @f$0 < x \le 1 \f$ will
never cause an overflow because addition will have no effect once the sum
reaches approx $308 * 2^{53}$ (correspnding to the machine precision).
The functions __get_*_sql are private because we do not want to commit ourselves
to a particular interface. We might want to be able to change implementation
details should the need arise.
@endinternal
"""
import plpy
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
def __nb_validate_args(**kwargs):
"""Validate parameters for Naive Bayes functions.
This function performs general checkings for all Naive Bayes functions,
assuming valid if the argument is not in **kwargs.
Possible keys in **kwargs:
@param trainingSource name of relation containing training data
@param trainingClassColumn name of column with class
@param trainingAttrColumn name of column with attributes array
@param numericAttrsColumnIndices list of column names that are to be treaded as of type numeric
@param numAttrs Number of attributes to use for classification
@param numericFeatureStatsDestName Name of relation containing the descriptive statistics
(mean,var) of the numerical features
@param classPriorsDestName Name of class-priors relation to create
@param featureProbsDestName Name of feature-probabilities relation to create
@param classPriorsSource Relation (class, class_cnt, all_cnt) where
class is c, class_cnt is \#c, all_cnt is the number of rows in
\em trainingSource
@param featureProbsSource Relation (class, attr, value, cnt, attr_cnt) where
(class, attr, value) = (c,i,a), cnt = \#(c,i,a), and attr_cnt = \#i
@param numericFeatureStatsSource Relation (class,attr,attr_mean,attr_var)
where mean is the mean of the numeric attr and var is its stddev, used for gaussian smoothing
@param classifySource Name of the relation that contains data to be classified
@param classifyKeyColumn Name of column in \em classifySource that can
serve as unique identifier
@param classifyAttrColumn Name of attributes-array column in \em classifySource
@param destName Name of the table or view to create
"""
for k in ('classPriorsDestName', 'featureProbsDestName', 'destName','numericFeatureStatsDestName'):
if k in kwargs:
out = kwargs[k]
if out is None or out == '':
plpy.error("Naive Bayes error: Invalid output table name!")
if table_exists(out, only_first_schema=True):
plpy.error("Naive Bayes error: Output table already exists. " + \
"Drop table '{0}' before calling the function.".format(out))
if 'numAttrs' in kwargs:
n_attr = kwargs['numAttrs']
if n_attr is None or n_attr <= 0:
plpy.error("Naive Bayes error: 'numAttrs' should be > 0!")
model_tbl = {'classPriorsSource': ('class', 'class_cnt', 'all_cnt'),
'featureProbsSource': ('class', 'attr', 'value', 'cnt', 'attr_cnt'),
'numericFeatureStatsSource' : ('class','attr','attr_mean','attr_var')}
empty_tables = []
for mdl in model_tbl:
if mdl in kwargs:
tbl = kwargs[mdl]
if tbl is None or tbl == '':
plpy.error("Naive Bayes error: Invalid {0} name!".format(mdl))
if not table_exists(tbl):
plpy.error("Naive Bayes error: " + \
"{0} '{1}' does not exist!".format(mdl, tbl))
if table_is_empty(tbl):
empty_tables += [mdl]
for c in model_tbl[mdl]:
if c is None or c == '':
plpy.error("Naive Bayes error: Invalid column name!")
if not columns_exist_in_table(tbl, model_tbl[mdl]):
plpy.error("Naive Bayes error: {0} '{1}' does not have the" + \
"necessary column(s)!".format(mdl, tbl))
# classPriorsSource should never be empty
if 'classPriorsSource' in empty_tables:
plpy.error("Naive Bayes error: " + \
"{0} '{1}' is empty!".format('classPriorsSource', kwargs['classPriorsSource']))
#at most one of the other tables can be empty
if 'featureProbsSource' in empty_tables and 'numericFeatureStatsSource' in empty_tables:
plpy.error("Naive Bayes error: " + \
"{0} '{1}' and {2} '{3}' are empty!".format(
'featureProbsSource',
kwargs['featureProbsSource'],
'numericFeatureStatsSource',
kwargs['numericFeatureStatsSource']))
if 'trainingSource' in kwargs:
tbl = kwargs['trainingSource']
c_col = kwargs['trainingClassColumn']
a_col = kwargs['trainingAttrColumn']
n_attr = kwargs['numAttrs']
if tbl is None or tbl == '':
plpy.error("Naive Bayes error: Invalid training table name!")
if not table_exists(tbl):
plpy.error("Naive Bayes error: " + \
"Training table '{0}' does not exist!".format(tbl))
if table_is_empty(tbl):
plpy.error("Naive Bayes error: " + \
"Training table '{0}' is empty!".format(tbl))
for c in (a_col, c_col):
if c is None or c == '':
plpy.error("Naive Bayes error: Invalid column name!")
if not columns_exist_in_table(tbl, [c]):
plpy.error("Naive Bayes error: " + \
"Column '{0}' does not exist!".format(c))
bound = plpy.execute("""SELECT {n_attr} <= array_upper({a_col}, 1)
AS bound FROM {tbl} LIMIT 1""".format(n_attr=str(n_attr),
a_col=a_col, tbl=tbl))[0]['bound']
if not bound:
plpy.error("Naive Bayes error:" + \
"'numAttrs' {0} is out of bound!".format(str(n_attr)))
if 'classifySource' in kwargs:
tbl = kwargs['classifySource']
k_col = kwargs['classifyKeyColumn']
a_col = kwargs['classifyAttrColumn']
n_attr = kwargs['numAttrs']
if tbl is None or tbl == '':
plpy.error("Naive Bayes error: Invalid classify table name!")
if not table_exists(tbl):
plpy.error("Naive Bayes error: " + \
"Classify table '{0}' does not exist!".format(tbl))
if table_is_empty(tbl):
plpy.error("Naive Bayes error: " + \
"Classify table '{0}' is empty!".format(tbl))
for c in (a_col, k_col):
if c is None or c == '':
plpy.error("Naive Bayes error: Invalid column name!")
if not columns_exist_in_table(tbl, [c]):
plpy.error("Naive Bayes error: " + \
"Column '{0}' does not exist!".format(c))
bound = plpy.execute("""SELECT {n_attr} <= array_upper({a_col}, 1)
AS bound FROM {tbl} LIMIT 1""".format(n_attr=str(n_attr),
a_col=a_col, tbl=tbl))[0]['bound']
if not bound:
plpy.error("Naive Bayes error:" + \
"'numAttrs' {0} is out of bound!".format(str(n_attr)))
def __get_numeric_attr_indices(numericFeatureStatsSource):
""" Return the SQL array containing the indices of the
numeric attributes in the numericFeatureStatsSource
table
"""
return """
SELECT array_agg(DISTINCT(attr))
FROM {0} as numericFeatureStatsSource
""".format(numericFeatureStatsSource)
def __get_feature_probs_sql(**kwargs):
"""Return SQL query with columns (class, attr, value, cnt, attr_cnt).
For class c, attr i, and value a, cnt is #(c,i,a) and attr_cnt is \#i.
Note that the query will contain a row for every pair (class, value)
occuring in the training data (so it might also contain rows where
\#(c,i,a) = 0).
@param classPriorsSource Relation (class, class_cnt, all_cnt) where
class is c, class_cnt is \#c, all_cnt is the number of rows in
\em trainingSource
@param attrValuesSource Relation (attr, value) containing all distinct
attribute, value pairs. If omitted, will use __get_attr_values_sql()
@param attrCountsSource Relation (attr, attr_cnt) where attr is i and
attr_cnt is \#i. If omitted, will use __get_attr_counts_sql()
@param trainingSource name of relation containing training data
@param trainingClassColumn name of column with class
@param trainingAttrColumn name of column with attributes array
@param numAttrs Number of attributes to use for classification
@param numericAttrsColumnIndices Indices of the attributes which are numeric
and hence must be removed from the feature probs table
For meanings of \#(c,i,a), \#c, and \#i see the general description of
\ref bayes.
"""
if not 'attrValuesSource' in kwargs:
kwargs.update(dict(
attrValuesSource = "(" + __get_attr_values_sql(**kwargs) + ")"
))
if not 'attrCountsSource' in kwargs:
kwargs.update(dict(
attrCountsSource = "(" + __get_attr_counts_sql(**kwargs) + ")"
))
# {trainingSource} cannot be a subquery, because we use it more than once in
# our generated SQL.
sql = """
SELECT
class,
attr,
value,
cnt,
attr_cnt
FROM(
SELECT
class,
attr,
value,
coalesce(cnt, 0) AS cnt,
attr_cnt
FROM
(
SELECT *
FROM
{classPriorsSource} AS classes
CROSS JOIN
{attrValuesSource} AS attr_values
) AS required_triples
LEFT OUTER JOIN
(
SELECT
trainingSource.{trainingClassColumn} AS class,
generate_series(1, {numAttrs}) AS attr,
unnest(trainingSource.{trainingAttrColumn}) AS value,
count(*) AS cnt
FROM
{trainingSource} AS trainingSource
WHERE
{schema_madlib}.array_contains_null({trainingAttrColumn}) = 'f'
GROUP BY
class,
attr,
value
) AS triple_counts
USING (class, attr, value)
INNER JOIN
{attrCountsSource} AS attr_counts
USING (attr)
) p
"""
if kwargs.has_key("numericAttrsColumnIndices") \
and kwargs['numericAttrsColumnIndices'].lower() != 'array[]':
sql += """
WHERE NOT EXISTS (
SELECT 1
FROM(
SELECT unnest({numericAttrsColumnIndices}::INTEGER[]) as attr
) q
WHERE p.attr=q.attr
)
"""
return sql.format(**kwargs)
def __get_attr_values_sql(**kwargs):
"""
Return SQL query with columns (attr, value).
The query contains a row for each pair that occurs in the training data.
@param trainingSource Name of relation containing the training data
@param trainingAttrColumn Name of attributes-array column in training data
@param numAttrs Number of attributes to use for classification
@internal
\par Implementation Notes:
If PostgreSQL supported count(DISTINCT ...) for window functions, we could
consolidate this function with __get_attr_counts_sql():
@verbatim
[...] count(DISTINCT value) OVER (PARTITION BY attr) [...]
@endverbatim
@endinternal
"""
return """
SELECT attr, value
FROM(
SELECT
generate_series(1, {numAttrs}) AS attr,
unnest(trainingSource.{trainingAttrColumn}) AS value
FROM
{trainingSource} AS trainingSource
WHERE
{schema_madlib}.array_contains_null({trainingAttrColumn}) = 'f'
GROUP BY attr, value
) l
WHERE value IS NOT NULL
""".format(**kwargs)
def __get_attr_counts_sql(**kwargs):
"""
Return SQL query with columns (attr, attr_cnt)
For attr i, attr_cnt is \#i.
@param trainingSource Name of relation containing the training data
@param trainingAttrColumn Name of attributes-array column in training data
@param numAttrs Number of attributes to use for classification
"""
return """
SELECT
attr, count(value) AS attr_cnt
FROM
(
SELECT
attr, value
FROM (
SELECT
generate_series(1, {numAttrs}) AS attr,
unnest(trainingSource.{trainingAttrColumn}) AS value
FROM
{trainingSource} AS trainingSource
WHERE
{schema_madlib}.array_contains_null({trainingAttrColumn}) = 'f'
) l
GROUP BY attr, value
) m
GROUP BY attr
""".format(**kwargs)
def __get_attr_numeric_stats_sql(**kwargs):
"""
Return SQL query with columns (class,attr, attr_mean,attr_var) for
numeric attributes
@param trainingSource Name of relation containing the training data
@param trainingClassColumn Name of the class column in the training source table
@param trainingAttrColumn Name of attributes-array column in training data
@param numAttrs Number of attributes to use for classification
@param numericAttrsColumnIndices Indices of attributes in trainingAttrColumn array to be treated as numeric
"""
return """
SELECT
class,attr, avg(value) AS attr_mean , variance(value) as attr_var
FROM
(
SELECT
l.class,l.attr,l.value
FROM (
SELECT
trainingSource.{trainingClassColumn} as class,
generate_series(1, {numAttrs}) AS attr,
unnest(trainingSource.{trainingAttrColumn}) AS value
FROM
{trainingSource} AS trainingSource
WHERE
{schema_madlib}.array_contains_null({trainingAttrColumn}) = 'f'
) l
INNER JOIN (
SELECT
unnest({numericAttrsColumnIndices}::INTEGER[]) AS attr
) m
ON (l.attr=m.attr)
) p
GROUP BY class,attr
""".format(**kwargs)
def __get_class_priors_sql(**kwargs):
"""
Return SQL query with columns (class, class_cnt, all_cnt)
For class c, class_cnt is \#c. all_cnt is the total number of records in the
training data.
@param trainingSource Name of relation containing the training data
@param trainingClassColumn Name of class column in training data
"""
return """
SELECT * FROM
(
SELECT
trainingSource.{trainingClassColumn} AS class,
count(*) AS class_cnt
FROM {trainingSource} AS trainingSource
WHERE
{schema_madlib}.array_contains_null({trainingAttrColumn}) = 'f'
GROUP BY trainingSource.{trainingClassColumn}
) l
CROSS JOIN
(
SELECT
count(*) AS all_cnt
FROM {trainingSource} AS trainingSource
WHERE
{schema_madlib}.array_contains_null({trainingAttrColumn}) = 'f'
) m
""".format(**kwargs)
def __get_class_prior_prob_values_sql(**kwargs):
"""
Return SQL query with columns (key, class, log_prob), where log_prob
is the class prior probability. This will return all possible combinations
of key,class so that they can later be grouped together and added with the
log probabilites of the attributes.
@param numAttrs Number of attributes to use for classification
@param classifySource Name of the relation that contains data to be classified
@param classifyKeyColumn Name of column in \em classifySource that can
serve as unique identifier
@param classifyAttrColumn Name of attributes-array column in \em classifySource
@param classPriorsSource
Relation (class, class_cnt, all_cnt) where
class is c, class_cnt is \#c, all_cnt is the number of training
samples.
"""
return """
SELECT
classify.{classifyKeyColumn} AS key,
classes.class,
log(classes.class_cnt::DOUBLE PRECISION/classes.all_cnt)
FROM
{classifySource} AS classify,
{classPriorsSource} AS classes
WHERE
{schema_madlib}.array_contains_null({classifyAttrColumn}) = 'f'
""".format(**kwargs)
def __get_keys_and_prob_values_sql(**kwargs):
"""
Return SQL query with columns (key, class, log_prob).
For class c and the attribute array identified by key k, log_prob is
log( P(C = c) * P(A = a(k)[] | C = c) ).
For each key k and class c, the query also contains a row (k, c, NULL). This
is for technical reasons (we want every key-class pair to appear in the
query. NULL serves as a default value if there is insufficient training data
to compute a probability value).
@param numAttrs Number of attributes to use for classification
@param classifySource Name of the relation that contains data to be classified
@param classifyKeyColumn Name of column in \em classifySource that can
serve as unique identifier
@param classifyAttrColumn Name of attributes-array column in \em classifySource
@param classPriorsSource
Relation (class, class_cnt, all_cnt) where
class is c, class_cnt is \#c, all_cnt is the number of training
samples.
@param featureProbsSource
Relation (class, attr, value, cnt, attr_cnt) where
(class, attr, value) = (c,i,a), cnt = \#(c,i,a), and attr_cnt = \#i
@param smoothingFactor Smoothing factor for computing feature
feature probabilities. Default value: 1.0 (Laplacian Smoothing).
"""
# {classifySource} cannot be a subquery, because we use it more than once in
# our generated SQL.
return """
SELECT t2.key, t2.class, t1.log_prob FROM
(
SELECT
classify.key,
classPriors.class,
CASE WHEN count(*) < 1 THEN 0 -- ignoring NULL attributes for now
-- {numAttrs} THEN NULL
ELSE
sum( log((featureProbs.cnt::DOUBLE PRECISION +
{smoothingFactor})/ (classPriors.class_cnt +
{smoothingFactor} * featureProbs.attr_cnt)) )
END
AS log_prob
FROM
{featureProbsSource} AS featureProbs,
{classPriorsSource} AS classPriors,
(
SELECT
classifySource.{classifyKeyColumn} AS key,
generate_series(1, {numAttrs}) AS attr,
unnest(classifySource.{classifyAttrColumn}) AS value
FROM
{classifySource} AS classifySource
WHERE
{schema_madlib}.array_contains_null({classifyAttrColumn}) = 'f'
) AS classify
WHERE
featureProbs.class = classPriors.class AND
featureProbs.attr = classify.attr AND
( CASE WHEN classify.value is NULL
THEN false
ELSE featureProbs.value = classify.value
END
)
AND
({smoothingFactor} > 0 OR featureProbs.cnt > 0) -- prevent division by 0
GROUP BY
classify.key, classPriors.class, classPriors.class_cnt, classPriors.all_cnt
) t1
RIGHT OUTER JOIN
(
SELECT
classify.{classifyKeyColumn} AS key,
classes.class
FROM
{classifySource} AS classify,
{classPriorsSource} AS classes
WHERE
{schema_madlib}.array_contains_null({classifyAttrColumn}) = 'f'
GROUP BY classify.{classifyKeyColumn}, classes.class
) t2
ON t1.key = t2.key AND t1.class=t2.class
""".format(**kwargs)
def __get_keys_and_probs_values_numeric_attrs_sql(**kwargs):
"""
This calculates the log probabilites of numeric attributes
based on the gaussian smoothing formula.
This will result in a table of the form (key,class,<sum of prob_values for
all_numeric_attrs>).These probabilites can then be added to the log
probabilities obtained in __get_keys_and_probs_values and the class priors
computed in __get_class_prior_prob_values_sql to compute the final
log probabilities.
@param numAttrs Number of attributes to use for classification
@param classifySource Name of the relation that contains data to be classified
@param classifyKeyColumn Name of column in \em classifySource that can
serve as unique identifier
@param classifyAttrColumn Name of attributes-array column in \em classifySource
@param classPriorsSource
Relation (class, class_cnt, all_cnt) where
class is c, class_cnt is \#c, all_cnt is the number of training
samples.
@param numericFeatureStatsSource
Relation (class, attr, attr_mean, attr_var) for gaussian smoothing
of continuous attributes.
"""
# {classifySource} cannot be a subquery, because we use it more than once in
# our generated SQL.
return """
SELECT s2.key, s2.class, s1.log_prob FROM
(
SELECT
classify.key,
numericFeatureStatsSource.class,
sum(-0.5*(log(2*pi()*attr_var::DOUBLE PRECISION) + log(exp(1.0))*(((attr_mean::DOUBLE PRECISION-value)^2)/(attr_var::DOUBLE PRECISION))))
AS log_prob
FROM
{numericFeatureStatsSource} AS numericFeatureStatsSource,
(
SELECT
classifySource.{classifyKeyColumn} AS key,
generate_series(1, {numAttrs}) AS attr,
unnest(classifySource.{classifyAttrColumn}) AS value
FROM
{classifySource} AS classifySource
WHERE
{schema_madlib}.array_contains_null({classifyAttrColumn}) = 'f'
) AS classify
WHERE
numericFeatureStatsSource.attr = classify.attr
GROUP BY
classify.key,numericFeatureStatsSource.class
) s1
RIGHT OUTER JOIN
(
SELECT
classify.{classifyKeyColumn} AS key,
classes.class
FROM
{classifySource} AS classify,
{classPriorsSource} AS classes
WHERE
{schema_madlib}.array_contains_null({classifyAttrColumn}) = 'f'
GROUP BY classify.{classifyKeyColumn}, classes.class
) s2
ON s1.key = s2.key AND s1.class=s2.class
""".format(**kwargs)
def __get_classification_sql(**kwargs):
"""
Return SQL query with columns (key, nb_classification, nb_log_probability)
@param keys_and_prob_values Relation (key, class, log_prob)
"""
kwargs.update(
keys_and_prob_values = "(" + __get_keys_and_prob_values_sql(**kwargs) + ")"
)
return """
SELECT
key,
{schema_madlib}.argmax(class, log_prob) AS nb_classification,
max(log_prob) AS nb_log_probability
FROM {keys_and_prob_values} AS keys_and_nb_values
GROUP BY key
""".format(**kwargs)
def create_prepared_data_table(**kwargs):
"""Wrapper around create_prepared_data() to create a table (and not a view)
"""
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
kwargs.update(whatToCreate = 'TABLE')
__nb_validate_args(**kwargs)
create_prepared_data(**kwargs)
plpy.execute("set client_min_messages to " + old_msg_level)
def create_prepared_data(**kwargs):
"""Precompute all class priors and feature probabilities.
When the precomputations are stored in a table, this function will create
indices that speed up lookups necessary for Naive Bayes classification.
Moreover, it runs ANALYZE on the new tables to allow for optimized query
plans.
Class priors are stored in a relation with columns
(class, class_cnt, all_cnt).
@param trainingSource Name of relation containing the training data
@param trainingClassColumn Name of class column in training data
@param trainingAttrColumn Name of attributes-array column in training data
@param numericAttrsColumnIndices Name of indices-array (index starting from 1) that contains indices of numeric attrs
@param numAttrs Number of attributes to use for classification
@param whatToCreate (Optional) Either \c 'TABLE' OR \c 'VIEW' (the default).
@param classPriorsDestName Name of class-priors relation to create
@param featureProbsDestName Name of feature-probabilities relation to create
@param numericFeatureStatsDestName Name of the statistics table for numeric attributes
"""
if not kwargs.has_key('numAttrs'):
plpy.error("'numAttrs' must be provided")
if not kwargs.has_key('trainingSource'):
plpy.error("'trainingSource' must be provided")
if not kwargs.has_key('trainingAttrColumn'):
plpy.error("'trainingAttrColumn' must be provided")
__verify_attr_num(
kwargs["trainingSource"],
kwargs["trainingAttrColumn"],
kwargs["numAttrs"])
# if only one of these is present, error out
if kwargs.has_key('numericAttrsColumnIndices') ^ kwargs.has_key('numericFeatureStatsDestName'):
plpy.error("Both 'numericAttrsColumnIndices' and 'numericFeatureStatsDestName' must be provided");
# verify if the attributes specified in the numericAttrsColumnIndices are
# really numeric in nature. TODO
if kwargs.has_key('numericAttrsColumnIndices') and kwargs.has_key('numericFeatureStatsDestName'):
__verify_numeric_attr_type(
kwargs["trainingSource"],
kwargs["trainingAttrColumn"],
kwargs["numericAttrsColumnIndices"],
kwargs["numAttrs"]);
# create the tables for the categorical kind attributes
if kwargs['whatToCreate'] == 'TABLE':
# FIXME: ANALYZE is not portable.
kwargs.update(dict(
attrCountsSource = '_madlib_nb_attr_counts',
attrValuesSource = '_madlib_nb_attr_values'
))
plpy.execute("""
DROP TABLE IF EXISTS {attrCountsSource};
CREATE TEMPORARY TABLE {attrCountsSource}
AS
{attr_counts_sql}
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (attr)');
ALTER TABLE {attrCountsSource} ADD PRIMARY KEY (attr);
ANALYZE {attrCountsSource};
DROP TABLE IF EXISTS {attrValuesSource};
CREATE TEMPORARY TABLE {attrValuesSource}
AS
{attr_values_sql}
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (attr, value)');
ALTER TABLE {attrValuesSource} ADD PRIMARY KEY (attr, value);
ANALYZE {attrValuesSource};
""".format(
attrCountsSource = kwargs['attrCountsSource'],
attrValuesSource = kwargs['attrValuesSource'],
attr_counts_sql = "(" + __get_attr_counts_sql(**kwargs) + ")",
attr_values_sql = "(" + __get_attr_values_sql(**kwargs) + ")"
)
)
kwargs.update(dict(
sql = __get_class_priors_sql(**kwargs)
))
plpy.execute("""
CREATE {whatToCreate} {classPriorsDestName}
AS
{sql}
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (class)');
""".format(**kwargs)
)
if kwargs['whatToCreate'] == 'TABLE':
plpy.execute("""
ALTER TABLE {classPriorsDestName} ADD PRIMARY KEY (class);
ANALYZE {classPriorsDestName};
""".format(**kwargs))
kwargs.update(dict(
classPriorsSource = kwargs['classPriorsDestName']
))
# Create the attr-mean-var-class table (for later use in computing probs).
# This can be directly computed using the sql query. no need for intermediate table
# Also update here the pointer to this globally created table
if kwargs.has_key('numericAttrsColumnIndices') and kwargs.has_key('numericFeatureStatsDestName'):
plpy.execute("""
CREATE {whatToCreate} {numericFeatureStatsDestName}
AS
{sql}
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (class, attr)');
""".format(
whatToCreate = kwargs['whatToCreate'],
numericFeatureStatsDestName = kwargs['numericFeatureStatsDestName'],
sql = __get_attr_numeric_stats_sql(**kwargs)
)
)
else:
kwargs.update(dict(
numericAttrsColumnIndices = 'ARRAY[]'
))
kwargs.update(dict(
sql = __get_feature_probs_sql(**kwargs)
))
plpy.execute("""
CREATE {whatToCreate} {featureProbsDestName}
AS
{sql}
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (class, attr, value)');
""".format(**kwargs)
)
if kwargs['whatToCreate'] == 'TABLE':
plpy.execute("""
ALTER TABLE {featureProbsDestName} ADD PRIMARY KEY (class, attr, value);
ANALYZE {featureProbsDestName};
DROP TABLE {attrCountsSource};
DROP TABLE {attrValuesSource};
""".format(**kwargs))
if kwargs.has_key('numericAttrsColumnIndices') and kwargs.has_key('numericFeatureStatsDestName'):
if kwargs['whatToCreate'] == 'TABLE':
plpy.execute("""
ALTER TABLE {numericFeatureStatsDestName} ADD PRIMARY KEY (class, attr);
ANALYZE {numericFeatureStatsDestName};
""".format(**kwargs))
def create_classification_view(**kwargs):
"""Wrapper around create_classification() to create a view (and not a table)
"""
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
kwargs.update(whatToCreate = 'VIEW')
__nb_validate_args(**kwargs)
create_classification(**kwargs)
plpy.execute("set client_min_messages to " + old_msg_level)
def create_classification(**kwargs):
"""
Create a view/table with columns (key, nb_classification).
The created relation will be
<tt>{TABLE|VIEW} <em>destName</em> (key, nb_classification)</tt>
where \c nb_classification is an array containing the most likely
class(es) of the record in \em classifySource identified by \c key.
There are two sets of arguments this function can be called with. The
following parameters are always needed:
@param numAttrs Number of attributes to use for classification
@param destName Name of the table or view to create
@param whatToCreate (Optional) Either \c 'TABLE' OR \c 'VIEW' (the default).
@param smoothingFactor (Optional) Smoothing factor for computing feature
feature probabilities. Default value: 1.0 (Laplacian Smoothing).
@param classifySource Name of the relation that contains data to be classified
@param classifyKeyColumn Name of column in \em classifySource that can
serve as unique identifier
@param classifyAttrColumn Name of attributes-array column in \em classifySource
Furthermore, provide either:
@param classPriorsSource
Relation (class, class_cnt, all_cnt) where
class is c, class_cnt is \#c, all_cnt is the number of training
samples.
@param featureProbsSource
Relation (class, attr, value, cnt, attr_cnt) where
(class, attr, value) = (c,i,a), cnt = \#(c,i,a), and attr_cnt = \#i
In addition to the above, for numeric attributes provide:
@param numericFeatureStatsSource
Relation (class,attr,attr_mean,attr_var) where
attr_mean and attr_vat are the mean and variance of attr respectively
Or have this function operate on the "raw" training data:
@param trainingSource
Name of relation containing the training data
@param trainingClassColumn
Name of class column in training data
@param trainingAttrColumn
Name of attributes-array column in \em trainingSource
In addition to the above, for numeric attributes provide:
@param numericAttrsColumnIndices
Array containing attribute indices of numeric attributes
"""
__init_prepared_data(kwargs)
if kwargs.has_key('trainingSource') <> kwargs.has_key('trainingAttrColumn'):
plpy.error("'trainingSource' and 'trainingAttrColumn' must be provided together")
if not kwargs.has_key('numAttrs'):
plpy.error("'numAttrs' must be provided")
if 'trainingSource' in kwargs:
__verify_attr_num(
kwargs["trainingSource"],
kwargs["trainingAttrColumn"],
kwargs["numAttrs"])
if not kwargs.has_key('classifySource'):
plpy.error("'classifySource' must be provided")
if not kwargs.has_key('classifyAttrColumn'):
plpy.error("'classifyAttrColumn' must be provided")
__verify_attr_num(
kwargs["classifySource"],
kwargs["classifyAttrColumn"],
kwargs["numAttrs"])
kwargs.update(
keys_and_prob_values = "(" + __get_keys_and_prob_values_sql(**kwargs) + ")",
class_prior_prob_values = "(" + __get_class_prior_prob_values_sql(**kwargs) + ")"
)
# if there are any numeric attributes in the test data, compute their probabilities as well.
if kwargs.has_key('numericFeatureStatsSource'):
kwargs.update(dict(
numeric_attrs_keys_and_prob_values = "(" + __get_keys_and_probs_values_numeric_attrs_sql(**kwargs) + ")"
))
# now add these probabilities to the original table
plpy.execute("""
CREATE {whatToCreate} {destName} AS
SELECT
p.key,
{schema_madlib}.argmax(p.class, (p.log_prob)) AS nb_classification
FROM(
SELECT
key,
class,
sum(log_prob) as log_prob
FROM(
{keys_and_prob_values}
UNION ALL
{numeric_attrs_keys_and_prob_values}
UNION ALL
{class_prior_prob_values}
) x
GROUP BY key,class
) p
GROUP BY p.key
""".format(**kwargs))
else:
plpy.execute("""
CREATE {whatToCreate} {destName} AS
SELECT
key,
{schema_madlib}.argmax(class, log_prob) AS nb_classification
FROM (
SELECT
key,
class,
sum(log_prob) as log_prob
FROM(
{keys_and_prob_values}
UNION ALL
{class_prior_prob_values}
) x
GROUP BY key,class
) AS keys_and_nb_values
GROUP BY key
""".format(**kwargs))
def create_bayes_probabilities_view(**kwargs):
"""Wrapper around create_bayes_probabilities() to create a view (and not a table)
"""
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
kwargs.update(whatToCreate = 'VIEW')
__nb_validate_args(**kwargs)
create_bayes_probabilities(**kwargs)
plpy.execute("set client_min_messages to " + old_msg_level)
def create_bayes_probabilities(**kwargs):
"""Create table/view with columns (key, class, nb_prob)
The created relation will be
<tt>{TABLE|VIEW} <em>destName</em> (key, class, nb_prob)</tt>
where \c nb_prob is the Naive-Bayes probability that \c class is the true
class of the record in \em classifySource identified by \c key.
There are two sets of arguments this function can be called with. The
following parameters are always needed:
@param numAttrs Number of attributes to use for classification
@param destName Name of the table or view to create
@param whatToCreate (Optional) Either \c 'TABLE' OR \c 'VIEW' (the default).
@param smoothingFactor (Optional) Smoothing factor for computing feature
feature probabilities. Default value: 1.0 (Laplacian Smoothing).
Furthermore, provide either:
@param classPriorsSource
Relation (class, class_cnt, all_cnt) where
class is c, class_cnt is \#c, all_cnt is the number of training
samples.
@param featureProbsSource
Relation (class, attr, value, cnt, attr_cnt) where
(class, attr, value) = (c,i,a), cnt = \#(c,i,a), and attr_cnt = \#i
In addition to the above, for numeric attributes provide:
@param numericFeatureStatsSource
Relation (class,attr,attr_mean,attr_var) where
attr_mean and attr_vat are the mean and variance of attr respectively
Or have this function operate on the "raw" training data:
@param trainingSource
Name of relation containing the training data
@param trainingClassColumn
Name of class column in training data
@param trainingAttrColumn
Name of attributes-array column in training data
In addition to the above, for numeric attributes provide:
@param numericAttrsColumnIndices
Array containing attribute indices of numeric attributes
@internal
\par Implementation Notes:
We have two numerical problems when copmuting the probabilities
@verbatim
P(C = c) * P(A = a | C = c)
P(C = c) = --------------------------------- (*)
--
\ P(C = c') * P(A = a | C = c')
/_
c'
__
where P(A = a | C = c) = || P(A_i = a_i | C = c).
i
@endverbatim
1. P(A = a | C = c) could be a very small number not representable in
double-precision floating-point arithmetic.
- Solution: We have log( P(C = c) * P(A = a | C = c) ) as indermediate
results. We will add the maximum absolute value of these intermediate
results to all of them. This corresponds to multiplying numerator and
denominator of (*) with the same factor. The "normalization" ensures
that the numerator of (*) can never be 0 (in FP arithmetic) for all c.
2. PostgreSQL raises an error in case of underflows, even when 0 is the
desirable outcome.
- Solution: if log_10 ( P(A = a | C = c) ) < -300, we interprete
P(A = a | C = c) = 0. Note here that 1e-300 is roughly in the order of
magnitude of the smallest double precision FP number.
@endinternal
"""
__init_prepared_data(kwargs)
if kwargs.has_key('trainingSource') <> kwargs.has_key('trainingAttrColumn'):
plpy.error("'trainingSource' and 'trainingAttrColumn' must be provided together")
if not kwargs.has_key('numAttrs'):
plpy.error("'numAttrs' must be provided")
if 'trainingSource' in kwargs:
__verify_attr_num(
kwargs["trainingSource"],
kwargs["trainingAttrColumn"],
kwargs["numAttrs"])
if not kwargs.has_key('classifySource'):
plpy.error("'classifySource' must be provided")
if not kwargs.has_key('classifyAttrColumn'):
plpy.error("'classifyAttrColumn' must be provided")
__verify_attr_num(
kwargs["classifySource"],
kwargs["classifyAttrColumn"],
kwargs["numAttrs"])
kwargs.update(dict(
keys_and_prob_values = "(" + __get_keys_and_prob_values_sql(**kwargs) + ")",
class_prior_prob_values = "(" + __get_class_prior_prob_values_sql(**kwargs) + ")"
))
# if there are any numeric attributes in the test data, compute their probabilities as well.
if kwargs.has_key('numericFeatureStatsSource'):
kwargs.update(dict(
numeric_attrs_keys_and_prob_values = "(" + __get_keys_and_probs_values_numeric_attrs_sql(**kwargs) + ")"
))
#use the modified version of the query to calculate final probabilities
#by adding the probabilities for the numeric attrs as well
plpy.execute("""
CREATE {whatToCreate} {destName} AS
SELECT
key,
class,
nb_prob / sum(nb_prob) OVER (PARTITION BY key) AS nb_prob
FROM
(
SELECT
p.key,
p.class,
CASE WHEN max(p.log_prob) - max(max(p.log_prob)) OVER (PARTITION BY p.key) < -300 THEN 0
ELSE pow(10, max(p.log_prob) - max(max(p.log_prob)) OVER (PARTITION BY p.key))
END AS nb_prob
FROM
(
SELECT
key,
class,
sum(log_prob) as log_prob
FROM(
{keys_and_prob_values}
UNION ALL
{numeric_attrs_keys_and_prob_values}
UNION ALL
{class_prior_prob_values}
) x
GROUP BY key,class
) p
GROUP BY
p.key, p.class
) AS keys_and_nb_values
ORDER BY
key, class
""".format(**kwargs))
else:
plpy.execute("""
CREATE {whatToCreate} {destName} AS
SELECT
key,
class,
nb_prob / sum(nb_prob) OVER (PARTITION BY key) AS nb_prob
FROM
(
SELECT
key,
class,
CASE WHEN max(log_prob) - max(max(log_prob)) OVER (PARTITION BY key) < -300 THEN 0
ELSE pow(10, max(log_prob) - max(max(log_prob)) OVER (PARTITION BY key))
END AS nb_prob
FROM(
SELECT
key,
class,
sum(log_prob) as log_prob
FROM(
{keys_and_prob_values}
UNION ALL
{class_prior_prob_values}
) x
GROUP BY key,class
) p
GROUP BY
key, class
) AS keys_and_nb_values
ORDER BY
key, class
""".format(**kwargs))
def __init_prepared_data(kwargs):
"""
Fill in values for optional parameters: Create subqueries instead of using
a relation.
"""
if not 'classPriorsSource' in kwargs:
kwargs.update(dict(
classPriorsSource = "(" + __get_class_priors_sql(**kwargs) + ")"
))
if kwargs.has_key('numericAttrsColumnIndices') and 'numericFeatureStatsSource' not in kwargs:
# sanity check the indices before executing any sql query
__verify_numeric_attr_type(
kwargs["trainingSource"],
kwargs["trainingAttrColumn"],
kwargs["numericAttrsColumnIndices"],
kwargs["numAttrs"]);
kwargs.update(dict(
numericFeatureStatsSource = "(" + __get_attr_numeric_stats_sql(**kwargs) + ")"
))
elif 'numericFeatureStatsSource' in kwargs:
kwargs.update(dict(
numericAttrsColumnIndices = __get_numeric_attr_indices(kwargs['numericFeatureStatsSource'])
))
if not 'featureProbsSource' in kwargs:
kwargs.update(dict(
featureProbsSource = "(" + __get_feature_probs_sql(**kwargs) + ")"
))
if not 'smoothingFactor' in kwargs:
kwargs.update(dict(
smoothingFactor = 1
))
def __verify_attr_num(sourceTable, attrColumn, numAttr):
"""
Verify the number of attributes for each record is as expected.
"""
result = plpy.execute("""
SELECT count(*) size FROM {0}
WHERE array_upper({1}, 1) <> {2}
""".format(sourceTable, attrColumn, numAttr)
)
dsize = result[0]['size']
if (dsize <> 0):
plpy.error('found %d records in "%s" where "%s" was not of expected length (%d)'\
% (dsize, sourceTable, attrColumn, numAttr))
def __verify_numeric_attr_type(sourceTable, attrColumn, numericAttrsColumnIndices, numAttr):
"""
Verify if the attribute indices provided in numericAttrsColumnIndices are
actually numeric
"""
#check if all data is composed of integers
result = plpy.execute("""
SELECT * FROM (
SELECT unnest({0}::TEXT[])
) x
WHERE x.unnest !~ '^[0-9]+$'
""".format(numericAttrsColumnIndices)
)
if len(result)>0:
plpy.error("Invalid array index %s specified for numeric attribute"%result[0]['unnest'])
#check for ranges
result = plpy.execute("""
SELECT max(unnest),min(unnest)
FROM unnest({0}::INTEGER[])
""".format(numericAttrsColumnIndices)
)
#empty array
if result[0]['max'] is None and result[0]['min'] is None:
plpy.error('No indices specified for numeric attributes')
#bounds checking
maxidx = int(result[0]['max']);
minidx = int(result[0]['min']);
nattrs = int(numAttr)
if maxidx > nattrs :
plpy.error('Invalid index value %d is greater than the number of attributes'%maxidx);
if minidx < 1:
plpy.error('Invalid index value %d is less than 1'%minidx);