blob: 8931066824f88a9bbf07ca7b3ef8d7bacf69f8f8 [file] [log] [blame]
# ------------------------------------------------------------------------
# Compute clustered errors
# ------------------------------------------------------------------------
import plpy
from utilities.utilities import unique_string
from utilities.utilities import _string_to_array_with_quotes
from utilities.utilities import extract_keyvalue_params
from utilities.validate_args import columns_exist_in_table
from utilities.utilities import rename_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import table_exists
from utilities.utilities import _assert
from utilities.utilities import add_postfix
# ========================================================================
def __prepare_strings(clustervar, grouping_col):
"""
Prepare the needed strings
"""
fitres = unique_string() # result table for linear regression
if grouping_col is None:
grouping_col_str = "NULL"
else:
grouping_col_str = "'" + grouping_col + "'"
coef_str = unique_string()
if clustervar is None:
cluster_grouping_str = "group by {coef_str}".format(coef_str=coef_str)
else:
cluster_grouping_str = "group by {coef_str}, ".format(coef_str=coef_str) + clustervar
return (fitres, coef_str, cluster_grouping_str, grouping_col_str)
# ------------------------------------------------------------------------
def __generate_clustered_sql(**kwargs):
"""
Create the SQL query to execute and create the result table
"""
sqlPart1 = """
create table {out_table} as
select (f).* from (
select {schema_madlib}.__clustered_{regr_type}_compute_stats(
max(coef),
m4_ifdef(`__POSTGRESQL__', `{schema_madlib}.__array_')sum(meatvec),
m4_ifdef(`__POSTGRESQL__', `{schema_madlib}.__array_')sum(breadvec),
count(numRows)::integer, (sum(numRows))::integer) as f
from (
select (g).*, coef, numRows from (
select """.format(**kwargs)
if kwargs['regr_type'] == 'mlog':
#We need to know the reference category and number of categories if doing the mlog regression
sqlPart2 = """{schema_madlib}.__clustered_err_{regr_type}_step(
({depvar})::INTEGER,
({indvar})::DOUBLE PRECISION[],
({coef_str})::DOUBLE PRECISION[],
(select count(distinct {depvar})
from {source_table})::INTEGER,
({ref_category})::INTEGER) as g,""".format(**kwargs)
elif kwargs['regr_type'] == 'log':
sqlPart2 = """{schema_madlib}.__clustered_err_{regr_type}_step(
({depvar})::BOOLEAN,
({indvar})::DOUBLE PRECISION[],
({coef_str})::DOUBLE PRECISION[]) as g,""".format(**kwargs)
else:
sqlPart2 = """{schema_madlib}.__clustered_err_{regr_type}_step(
{depvar},
({indvar})::DOUBLE PRECISION[],
({coef_str})::DOUBLE PRECISION[]) as g,""".format(**kwargs)
sqlPart3 = """{coef_str} as coef,
count({depvar}) as numRows
FROM (
SELECT u.coef as {coef_str}, v.*
FROM
{fitres} u, {source_table} v
where
({indvar}) is not NULL and
{schema_madlib}.array_contains_null({indvar}) is False and
({depvar}) is not NULL
) s
{cluster_grouping_str}) t) p) q
""".format(**kwargs)
return sqlPart1 + sqlPart2 + sqlPart3
# ========================================================================
def clustered_variance_linregr(schema_madlib, source_table, out_table,
depvar, indvar, clustervar, grouping_col,
**kwargs):
"""
Linear regression clustered standard errors
"""
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
validate_args_clustered_variance_linregr(schema_madlib, source_table,
out_table, depvar, indvar,
clustervar, grouping_col)
(fitres, coef_str, cluster_grouping_str,
grouping_col_str) = __prepare_strings(clustervar, grouping_col)
plpy.execute(
"""
select {schema_madlib}.linregr_train('{source_table}',
'{fitres}', '{depvar}', '{indvar}', {grouping_col})
""".format(schema_madlib=schema_madlib, source_table=source_table,
fitres=fitres, depvar=depvar, indvar=indvar,
grouping_col=grouping_col_str))
out_table_summary = add_postfix(out_table, "_summary")
fitres_summary = add_postfix(fitres, "_summary")
# Rename the output summary table
rename_table(schema_madlib, fitres_summary, out_table_summary)
plpy.execute("UPDATE {0} SET out_table = '{1}'".
format(out_table_summary, out_table))
plpy.execute(
__generate_clustered_sql(schema_madlib=schema_madlib, depvar=depvar,
indvar=indvar, coef_str=coef_str,
source_table=source_table, fitres=fitres,
cluster_grouping_str=cluster_grouping_str,
out_table=out_table, regr_type="lin"))
plpy.execute("drop table if exists {fitres}".format(fitres=fitres))
plpy.execute("set client_min_messages to " + old_msg_level)
return None
# ========================================================================
def validate_args_clustered_variance(schema_madlib, source_table,
out_table, depvar, indvar,
clustervar, grouping_col):
"""
Validate the parameters
"""
if not source_table or source_table.lower() in ('null', '') or not table_exists(source_table):
plpy.error("Clustered variance estimation error: Data table does not exist!")
if table_is_empty(source_table):
plpy.error("Clustered variance estimation error: Data table is empty!")
if out_table is None or out_table.lower() in ('null', ''):
plpy.error("Clustered variance estimation error: Invalid output table name!")
if table_exists(out_table, only_first_schema=True):
plpy.error("Clustered variance estimation error: Output table exists!")
if table_exists(add_postfix(out_table, "_summary"), only_first_schema=True):
plpy.error("Clustered variance estimation error: Output summary table exists!")
if depvar is None or (not isinstance(depvar, str)) or depvar.lower() in ('null', ''):
plpy.error("Clustered variance estimation error: Invalid dependent column name!")
if indvar is None or (not isinstance(indvar, str)) or indvar.lower() in ('null', ''):
plpy.error("Clustered variance estimation error: Invalid independent column name!")
# try:
# plpy.execute("select {indvar}".format(indvar = indvar))
# isconst = True
# except:
# isconst = False
# if isconst:
# plpy.error("Clustered variance estimation error: independent variable is a constant!")
# try:
# plpy.execute("select {depvar}".format(depvar = depvar))
# notconst = False
# except:
# notconst = True
# if notconst is False:
# plpy.error("Clustered variance estimation error: dependent variable is a constant!")
# try:
# plpy.execute("select {indvar} from {tbl} limit 1".format(indvar = indvar, tbl = source_table))
# success = True
# except:
# success = False
# if success is False:
# plpy.error("Clustered variance estimation error: independent variable does not exist in the data table!")
# try:
# plpy.execute("select {depvar} from {tbl} limit 1".format(depvar = depvar, tbl = source_table))
# success = True
# except:
# success = False
# if not success:
# plpy.error("Clustered variance estimation error: dependent variable does not exist in the data table!")
# if not scalar_col_has_no_null(source_table, depvar):
# plpy.error("Clustered variance estimation error: Dependent variable has Null values! \
# Please filter out Null values before using this function!")
if clustervar is None or clustervar.lower() in ('null', ''):
# clustervar is optional but if provided should be valid column name
plpy.error("Clustered variance estimation error: Invalid cluster columns name!")
if clustervar is not None:
if not columns_exist_in_table(source_table,
_string_to_array_with_quotes(clustervar),
schema_madlib):
plpy.error("Clustered variance estimation error: Cluster column does not exist!")
if grouping_col is not None and grouping_col.lower() in ('null', ''):
# grouping_col is optional but if provided should be valid column name
plpy.error("Clustered variance estimation error: Invalid grouping columns name!")
if grouping_col:
if not columns_exist_in_table(source_table,
_string_to_array_with_quotes(grouping_col),
schema_madlib):
plpy.error("Clustered variance estimation error: Grouping column does not exist!")
# ========================================================================
def validate_args_clustered_variance_linregr(schema_madlib, source_table,
out_table, depvar, indvar,
clustervar, grouping_col):
"""
Validate the parameters
"""
validate_args_clustered_variance(schema_madlib, source_table,
out_table, depvar, indvar,
clustervar, grouping_col)
# ========================================================================
def clustered_variance_linregr_help(schema_madlib, msg=None, **kwargs):
"""
Print help messages
"""
if msg is None or msg.strip(' ') == 'help' or msg.strip(' ') == '?':
return """
----------------------------------------------------------------
Summary
----------------------------------------------------------------
Computes the clustered standard errors for linear regression.
The function first runs a linear regression to get the fitting
coefficients. Then it computes the clustered standard errors for
the linear regression.
SELECT {schema_madlib}.clustered_variance_linregr(
'source_table',
'out_table',
'depvar',
'indvar',
'clustervar',
'grouping_col'
);
--
Run:
SELECT {schema_madlib}.clustered_variance_linregr('usage');
to get more information.
""".format(schema_madlib=schema_madlib)
if msg.strip(' ') == 'usage':
return """
Usage:
----------------------------------------------------------------
SELECT {schema_madlib}.clustered_variance_linregr(
'source_table', -- Name of data table
'out_table', -- Name of result table (raise an error if it already exists)
'depvar', -- Expression for dependent variable
'indvar', -- Expression for independent variables
'clustervar', -- Column names for cluster variables, separated by comma
'grouping_col' -- Grouping regression column names, separated by comma, default NULL
);
Output:
----------------------------------------------------------------
The output table has the following columns:
coef DOUBLE PRECISION[], -- Fitting coefficients
std_err DOUBLE PRECISION[], -- Clustered standard errors for coef
t_stats DOUBLE PRECISION[], -- t-stats of the errors
p_values DOUBLE PRECISION[] -- p-values of the errors
The output summary table is the same as linregr_train(), see also:
SELECT linregr_train('usage');
""".format(schema_madlib=schema_madlib)
# ========================================================================
# Clustered errors for logistic regression
# ========================================================================
def clustered_variance_logregr(schema_madlib, source_table, out_table,
depvar, indvar, clustervar, grouping_col,
max_iter, optimizer, tolerance, verbose_mode,
**kwargs):
"""
Logistic regression clustered standard errors
"""
if optimizer is not None and optimizer.lower() == 'newton':
optimizer = 'irls'
validate_args_clustered_variance_logregr(schema_madlib, source_table,
out_table, depvar, indvar,
clustervar, grouping_col,
max_iter, optimizer, tolerance,
verbose_mode)
(fitres, coef_str, cluster_grouping_str,
grouping_col_str) = __prepare_strings(clustervar, grouping_col)
plpy.execute(
"""
SELECT {schema_madlib}.logregr_train(
'{source_table}', '{fitres}', '{depvar}', '{indvar}',
{grouping_col}, {max_iter}, '{optimizer}', {tolerance},
{verbose})
""".format(schema_madlib=schema_madlib, source_table=source_table,
fitres=fitres, depvar=depvar, indvar=indvar,
grouping_col=grouping_col_str, max_iter=max_iter,
optimizer=optimizer, tolerance=tolerance,
verbose=verbose_mode))
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
if verbose_mode:
plpy.execute("set client_min_messages to warning")
else:
plpy.execute("set client_min_messages to error")
out_table_summary = add_postfix(out_table, "_summary")
fitres_summary = add_postfix(fitres, "_summary")
# Rename the output summary table
rename_table(schema_madlib, fitres_summary, out_table_summary)
plpy.execute("""UPDATE {0} SET out_table = '{1}'
""".format(out_table_summary, out_table))
plpy.execute(
__generate_clustered_sql(schema_madlib=schema_madlib, depvar=depvar,
indvar=indvar, coef_str=coef_str,
source_table=source_table, fitres=fitres,
cluster_grouping_str=cluster_grouping_str,
out_table=out_table, regr_type="log"))
plpy.execute("DROP TABLE IF EXISTS {fitres}".format(fitres=fitres))
plpy.execute("SET client_min_messages TO " + old_msg_level)
return None
# ========================================================================
def validate_args_clustered_variance_logregr(schema_madlib, source_table,
out_table, depvar, indvar,
clustervar, grouping_col,
max_iter, optimizer, tolerance,
verbose_mode):
"""
Validate the parameters
"""
validate_args_clustered_variance(schema_madlib, source_table,
out_table, depvar, indvar,
clustervar, grouping_col)
if max_iter is None or max_iter <= 0:
plpy.error("Clustered variance estimation error: Maximum number of "
"iterations must be a positive number!")
if tolerance is None or tolerance < 0:
plpy.error("Clustered variance estimation error: The tolerance must be "
"a non-negative number!")
if not isinstance(verbose_mode, bool):
plpy.error("Clustered variance estimation error: verbose_mode option must "
"be a boolean!")
if optimizer not in ("irls", "cg", "igd"):
plpy.error(""" Clustered variance estimation error: Unknown optimizer requested.
Must be 'newton'/'irls', 'cg', or 'igd'.
""")
# ========================================================================
def clustered_variance_logregr_help(schema_madlib, msg=None, **kwargs):
"""
Print help messages
"""
if msg is None or msg.strip(' ') == 'help' or msg.strip(' ') == '?':
return """
----------------------------------------------------------------
Summary
----------------------------------------------------------------
Computes the clustered standard errors for logistic regression.
The function first runs a logistic regression to get the fitting
coefficients. Then it computes the clustered standard errors for
the logistic regression.
SELECT {schema_madlib}.clustered_variance_logregr(
'source_table',
'out_table',
'depvar',
'indvar',
'clustervar',
'grouping_col',
max_iter,
'optimizer',
tolerance,
verbose_mode
);
--
Run:
SELECT {schema_madlib}.clustered_variance_logregr('usage');
to get more information.
""".format(schema_madlib=schema_madlib)
if msg.strip(' ') == 'usage':
return """
Usage:
----------------------------------------------------------------
SELECT {schema_madlib}.clustered_variance_logregr(
'source_table', -- Name of data table
'out_table', -- Name of result table (raise an error if it already exists)
'depvar', -- Expression for dependent variable
'indvar', -- Expression for independent variables
'clustervar', -- Column names for cluster variables, separated by comma
'grouping_col', -- Grouping regression column names, separated by comma, default NULL
max_iter, -- Maximum iteration number for logistic regression, default 20
'optimizer', -- Optimization method for logistic regression, default 'irls'
tolerance, -- When difference of likelihoods in two consecutive iterations smaller than
-- this value, stops the computation. Default 0.0001
verbose_mode -- Whether print detailed information when computing logistic regression,
-- default is False
);
Output:
----------------------------------------------------------------
The output table has the following columns:
coef DOUBLE PRECISION[], -- Fitting coefficients
std_err DOUBLE PRECISION[], -- Clustered standard errors for coef
z_stats DOUBLE PRECISION[], -- z-stats of the errors
p_values DOUBLE PRECISION[] -- p-values of the errors
The output summary table is the same as logregr_train(), see also:
SELECT logregr_train('usage');
""".format(schema_madlib=schema_madlib)
# ========================================================================
# Clustered errors for multi-logistic regression
# ========================================================================
def clustered_variance_mlogregr(
schema_madlib, source_table, out_table, dependent_varname,
independent_varname, cluster_varname, ref_category,
grouping_cols=None, optimizer_params=None, verbose_mode=None,
**kwargs):
"""
Multi-Logistic regression clustered standard errors
"""
old_msg_level = plpy.execute("select setting from pg_settings where \
name='client_min_messages'")[0]['setting']
plpy.execute("set client_min_messages to error")
# extract optimizer parameters
allowed_param_types = {
'max_iter': int,
'max_num_iterations': int,
'optimizer': str,
'tolerance': float,
'precision': float}
default_optimizer_values = {
'max_iter': 20,
'optimizer': 'irls',
'tolerance': 0.0001}
optimizer_param_dict = extract_keyvalue_params(
optimizer_params, allowed_param_types, default_optimizer_values)
if not optimizer_param_dict:
optimizer_param_dict = default_optimizer_values
if 'precision' in optimizer_param_dict and \
'tolerance' not in optimizer_param_dict:
optimizer_param_dict['tolerance'] = optimizer_param_dict['precision']
if 'max_num_iterations' in optimizer_param_dict and \
'max_iter' not in optimizer_param_dict:
optimizer_param_dict['max_iter'] = optimizer_param_dict['max_num_iterations']
if optimizer_param_dict['optimizer'].lower() == 'newton':
optimizer_param_dict['optimizer'] = 'irls'
validate_args_clustered_variance_mlogregr(
schema_madlib, source_table, out_table, dependent_varname,
independent_varname, cluster_varname, grouping_cols, ref_category,
verbose_mode, **optimizer_param_dict)
(fitres, coef_str, cluster_grouping_str, grouping_col_str) = __prepare_strings(cluster_varname, grouping_cols)
plpy.execute(
"""
SELECT
{schema_madlib}.mlogregr_train(
'{source_table}', '{fitres}',
'{dependent_varname}', '{independent_varname}', {ref_category},
'max_iter={max_iter}, optimizer={optimizer}, tolerance={tolerance}')
""".format(schema_madlib=schema_madlib, source_table=source_table,
fitres=fitres, dependent_varname=dependent_varname,
independent_varname=independent_varname,
ref_category=ref_category, **optimizer_param_dict))
out_table_summary = add_postfix(out_table, "_summary")
fitres_summary = add_postfix(fitres, "_summary")
# Rename the output summary table
rename_table(schema_madlib, fitres_summary, out_table_summary)
plpy.execute("UPDATE {0} SET out_table = '{1}'".
format(out_table_summary, out_table))
coef_table = unique_string()
plpy.execute("""
CREATE TEMP TABLE {coef_table} AS
SELECT
{schema_madlib}.matrix_agg(coef ORDER BY category) AS coef
FROM {fitres}
""".format(schema_madlib=schema_madlib, fitres=fitres, coef_table=coef_table))
plpy.execute(
"""
drop table if exists {fitres}
""".format(fitres=fitres))
plpy.execute(
__generate_clustered_sql(
schema_madlib=schema_madlib,
depvar=dependent_varname, indvar=independent_varname,
coef_str=coef_str, source_table=source_table, fitres=coef_table,
cluster_grouping_str=cluster_grouping_str, out_table=out_table,
ref_category=ref_category, regr_type="mlog"))
num_categories = plpy.execute(
"SELECT count(DISTINCT {0}) as n_cat FROM {1}".
format(dependent_varname, source_table))[0]['n_cat']
num_features = plpy.execute("""
SELECT
array_upper({independent_varname}, 1) fnum
FROM {source_table} LIMIT 1
""".format(independent_varname=independent_varname, source_table=source_table))[0]['fnum']
tmp_table = unique_string()
plpy.execute("""
CREATE TABLE {tmp_table} AS
SELECT
({schema_madlib}.__mlogregr_format(
coef, {num_features},
{num_categories}, {ref_category})
).category AS category,
{ref_category} as ref_category,
({schema_madlib}.__mlogregr_format(
coef, {num_features},
{num_categories}, {ref_category})
).coef AS coef,
({schema_madlib}.__mlogregr_format(
std_err, {num_features},
{num_categories}, {ref_category})
).coef AS std_err,
({schema_madlib}.__mlogregr_format(
z_stats, {num_features},
{num_categories}, {ref_category})
).coef AS z_stats,
({schema_madlib}.__mlogregr_format(
p_values, {num_features},
{num_categories}, {ref_category})
).coef AS p_values
FROM
{out_table};
DROP TABLE IF EXISTS {out_table};
""".format(schema_madlib=schema_madlib, tmp_table=tmp_table,
num_features=num_features, num_categories=num_categories,
ref_category=ref_category, out_table=out_table))
# Rename the output table
rename_table(schema_madlib,
"{old}".format(old=tmp_table),
"{new}".format(new=out_table))
plpy.execute("set client_min_messages to " + old_msg_level)
return None
# ========================================================================
def validate_args_clustered_variance_mlogregr(
schema_madlib, source_table, output_table, dependent_varname,
independent_varname, cluster_varname, grouping_col, ref_category,
verbose_mode, max_iter, optimizer, tolerance, **kwargs):
"""
Validate the parameters
"""
validate_args_clustered_variance(
schema_madlib, source_table, output_table, dependent_varname,
independent_varname, cluster_varname, grouping_col)
if ref_category is None:
plpy.error("Clustered variance error: Reference category "
"cannot be null!")
if ref_category < 0:
plpy.error("Clustered variance error: Reference category "
"cannot be negative!")
if not isinstance(verbose_mode, bool):
plpy.error("Clustered variance error: verbose_mode option "
"must be a boolean!")
if max_iter <= 0:
plpy.error("Clustered variance error: Maximum number of "
"iterations must be positive!")
if tolerance < 0:
plpy.error("Clustered variance error: The tolerance cannot "
"be negative!")
if optimizer not in ("newton", "irls"):
plpy.error(""" Clustered variance error: Unknown optimizer
requested. Must be 'newton'/'irls'.
""")
result_w_null = plpy.execute("""
SELECT DISTINCT {dep} AS cat
FROM {source}
WHERE {dep} is not NULL
""".format(source=source_table,
indep=independent_varname,
dep=dependent_varname))
result_wo_null = plpy.execute("""
SELECT DISTINCT {dep} AS cat
FROM {source}
WHERE {dep} is not NULL
AND NOT {madlib}.array_contains_null({indep})
""".format(madlib=schema_madlib, source=source_table,
indep=independent_varname,
dep=dependent_varname))
categories_wo_null = set(i["cat"] for i in result_wo_null)
categories_w_null = set(i["cat"] for i in result_w_null)
_assert(categories_wo_null == categories_w_null,
"Clustered variance error: All observations of category set {0} contain "
"NULL values. These rows should be removed from the dataset "
"before proceeding.".
format(list(categories_w_null - categories_wo_null)))
# ========================================================================
def clustered_variance_mlogregr_help(schema_madlib, msg=None, **kwargs):
"""
Print help messages
"""
if msg is None or msg.strip(' ') in ('help', '?'):
return """
----------------------------------------------------------------
Summary
----------------------------------------------------------------
Computes the clustered standard errors for multi-logistic regression.
The function first runs a multi-logistic regression to get the fitting
coefficients. Then it computes the clustered standard errors for
the multi-logistic regression.
SELECT {schema_madlib}.clustered_variance_mlogregr(
'source_table',
'out_table',
'depvar',
'indvar',
'clustervar',
'ref_category',
'grouping_col',
max_iter,
'optimizer',
tolerance,
verbose_mode
);
--
Run:
SELECT {schema_madlib}.clustered_variance_mlogregr('usage');
to get more information.
""".format(schema_madlib=schema_madlib)
if msg.strip(' ') == 'usage':
return """
Usage:
----------------------------------------------------------------
SELECT {schema_madlib}.clustered_variance_mlogregr(
'source_table', -- Name of data table
'out_table', -- Name of result table (raise an error if it already exists)
'depvar', -- Expression for dependent variable
'indvar', -- Expression for independent variables
'clustervar', -- Column names for cluster variables, separated by comma
'ref_category', -- Reference category for the multi-logistic regression
'grouping_col', -- Grouping regression column names, separated by comma, default NULL
max_iter, -- Maximum iteration number for logistic regression, default 20
'optimizer', -- Optimization method for logistic regression, default 'irls'
tolerance, -- When difference of likelihoods in two consecutive iterations smaller than
-- this value, stops the computation. Default 0.0001
verbose_mode -- Whether print detailed information when computing logistic regression,
-- default is False
);
Output:
----------------------------------------------------------------
The output table has the following columns:
ref_category INTEGER -- The reference category used in the multi-logistic regression
coef DOUBLE PRECISION[], -- Fitting coefficients
std_err DOUBLE PRECISION[], -- Clustered standard errors for coef
z_stats DOUBLE PRECISION[], -- z-stats of the errors
p_values DOUBLE PRECISION[] -- p-values of the errors
The output summary table is the same as mlogregr_train(), see also:
SELECT mlogregr_train('usage');
""".format(schema_madlib=schema_madlib)