blob: 15881b45bc2ef5cf123da22256a5f66995396809 [file] [log] [blame]
import plpy
from elastic_net_utils import _process_results
from elastic_net_utils import _compute_log_likelihood
from utilities.validate_args import get_cols_and_types
from utilities.validate_args import quote_ident
from utilities.utilities import split_quoted_delimited_str
from internal.db_utils import quote_literal
def _elastic_net_generate_result(optimizer, iteration_run, **args):
"""
Generate result table for all optimizers
"""
standardize_flag = "True" if args["normalization"] else "False"
source_table = args["rel_source"]
data_scaled = False
if args["normalization"] or optimizer == "igd":
# x_mean_table and y_mean_table are created only in these conditions.
data_scaled = True
if optimizer == "fista":
result_func = "__gaussian_fista_result({0})".format(args["col_grp_state"])
elif optimizer == "igd":
result_func = """__gaussian_igd_result({col_grp_state},
'{sq_str}'::double precision[],
{threshold}::double precision,
{tolerance}::double precision
)
""".format(col_grp_state=args["col_grp_state"],
tolerance=args["warmup_tolerance"],
threshold=args["threshold"],
sq_str=args["sq_str"])
tbl_state = "{rel_state}".format(rel_state=args["rel_state"])
grouping_column = args['grouping_col']
if grouping_column:
col_grp_key = args['col_grp_key']
grouping_str = args['grouping_str']
cols_types = dict(get_cols_and_types(args["tbl_source"]))
grouping_cols_list = split_quoted_delimited_str(grouping_column)
grouping_str1 = ','.join(['{0} AS {1}'.format(g, quote_ident(g))
for g in grouping_cols_list])
select_mean_and_std = ''
inner_join_x = ''
inner_join_y = ''
select_grp = ','.join(['n_tuples_including_nulls_subq.' + str(quote_ident(grp))
for grp in grouping_cols_list]) + ','
select_grouping_info = ','.join([grp_col + "\t" + cols_types[grp_col]
for grp_col in grouping_cols_list]) + ","
if data_scaled:
x_grp_cols = ' AND '.join([
'{0} = {1}.{2}'.format('n_tuples_including_nulls_subq.' + str(quote_ident(grp)),
args["x_mean_table"], grp)
for grp in grouping_cols_list])
y_grp_cols = ' AND '.join([
'{0}={1}.{2}'.format('n_tuples_including_nulls_subq.' + str(quote_ident(grp)),
args["y_mean_table"], grp)
for grp in grouping_cols_list])
select_mean_and_std = (
' {0}.mean AS x_mean, '.format(args["x_mean_table"]) +
' {0}.mean AS y_mean, '.format(args["y_mean_table"]) +
' {0}.std AS x_std, '.format(args["x_mean_table"]))
inner_join_x = ' INNER JOIN {0} ON {1} '.format(
args["x_mean_table"], x_grp_cols)
inner_join_y = ' INNER JOIN {0} ON {1} '.format(
args["y_mean_table"], y_grp_cols)
out_table_qstr = """
SELECT
{select_grp}
{select_mean_and_std}
(result).coefficients AS coef,
(result).intercept AS intercept
FROM
(
SELECT
{grouping_str1},
array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key}
FROM {source_table}
GROUP BY {grouping_column}, {col_grp_key}
) n_tuples_including_nulls_subq
INNER JOIN
(
SELECT {schema_madlib}.{result_func} AS result, {col_grp_key}
FROM {tbl_state}
WHERE {col_grp_iteration} = {iteration_run}
) t USING ({col_grp_key})
{inner_join_x}
{inner_join_y}
""".format(schema_madlib=args["schema_madlib"],
col_grp_iteration=args["col_grp_iteration"], **locals())
else:
# It's a much simpler query when there is no grouping.
grouping_cols_list = []
select_grouping_info = ""
out_table_qstr = """
SELECT
(result).coefficients AS coef,
(result).intercept AS intercept
FROM (
SELECT {schema_madlib}.{result_func} AS result
FROM {tbl_state}
WHERE {col_grp_iteration} = {iteration_run}
) t
""".format(result_func=result_func, tbl_state=tbl_state,
col_grp_iteration=args["col_grp_iteration"],
iteration_run=iteration_run,
schema_madlib=args["schema_madlib"])
# Create the output table
plpy.execute("DROP TABLE IF EXISTS {tbl_result}".format(**args))
plpy.execute("""
CREATE TABLE {tbl_result} (
{select_grouping_info}
family text,
features text[],
features_selected text[],
coef_nonzero double precision[],
coef_all double precision[],
intercept double precision,
log_likelihood double precision,
standardize boolean,
iteration_run integer)
""".format(select_grouping_info=select_grouping_info, **args))
result = plpy.execute(out_table_qstr)
for res in result:
build_output_table(res, grouping_column, grouping_cols_list,
standardize_flag, iteration_run, **args)
# Create summary table, listing the grouping columns used.
grouping_text = "NULL" if not grouping_column else grouping_column
failed_groups = plpy.execute("""
SELECT count(*) AS num_failed_groups
FROM {0}
WHERE coef_all IS NULL
""".format(args['tbl_result']))[0]
all_groups = plpy.execute("SELECT count(*) AS num_all_groups FROM {0} ".
format(args['tbl_result']))[0]
args.update(failed_groups)
args.update(all_groups)
plpy.execute("""
CREATE TABLE {tbl_summary} AS
SELECT
'elastic_net'::varchar AS method,
'{tbl_source}'::varchar AS source_table,
'{tbl_result}'::varchar AS out_table,
$madlib_super_quote${col_dep_var}$madlib_super_quote$::varchar
AS dependent_varname,
$madlib_super_quote${col_ind_var}$madlib_super_quote$::varchar
AS independent_varname,
'{family}'::varchar AS family,
{alpha}::float AS alpha,
{lambda_value}::float AS lambda_value,
$madlib_super_quote${grouping_text}$madlib_super_quote$::varchar
AS grouping_col,
{num_all_groups}::integer AS num_all_groups,
{num_failed_groups}::integer AS num_failed_groups
""".format(grouping_text=grouping_text,
**args))
return None
def build_output_table(res, grouping_column, grouping_col_list,
standardize_flag, iteration_run, **args):
"""
Insert model captured in "res" into the output table
"""
r_coef = res["coef"]
if r_coef:
if args["normalization"]:
if grouping_column:
(coef, intercept) = _restore_scale(r_coef, res["intercept"],
args, res["x_mean"],
res["x_std"], res["y_mean"])
else:
(coef, intercept) = _restore_scale(r_coef,
res["intercept"], args)
else:
coef = r_coef
intercept = res["intercept"]
(features, features_selected, dense_coef, sparse_coef) = _process_results(
coef, intercept, args["outstr_array"])
log_likelihood = _compute_log_likelihood(r_coef, res["intercept"], **args)
if grouping_column:
grouping_info = ",".join([quote_literal(str(res[grp_col.strip()]))
for grp_col in grouping_col_list
if grp_col.strip() in res.keys()])
else:
grouping_info = ""
if grouping_info:
grouping_info += ', '
fquery = """
INSERT INTO {tbl_result} VALUES
({grouping_info} '{family}', '{features}'::text[], '{features_selected}'::text[],
'{dense_coef}'::double precision[], '{sparse_coef}'::double precision[],
{intercept}, {log_likelihood}, {standardize_flag}, {iteration})
""".format(features=features, features_selected=features_selected,
dense_coef=dense_coef, sparse_coef=sparse_coef,
intercept=intercept, log_likelihood=log_likelihood,
grouping_info=grouping_info,
standardize_flag=standardize_flag, iteration=iteration_run,
**args)
plpy.execute(fquery)
# ------------------------------------------------------------------------
def _restore_scale(coef, intercept, args,
x_mean=None, x_std=None, y_mean=None):
"""
Restore the original scales
"""
if x_mean is None and x_std is None and y_mean is None:
x_mean = args["x_scales"]["mean"]
y_mean = args["y_scale"]["mean"]
x_std = args["x_scales"]["std"]
rcoef = [0] * len(coef)
if args["family"] == "gaussian":
rintercept = float(y_mean)
elif args["family"] == "binomial":
rintercept = float(intercept)
for i in range(len(coef)):
if x_std[i] != 0:
rcoef[i] = coef[i] / x_std[i]
rintercept -= (coef[i] * x_mean[i] / x_std[i])
return (rcoef, rintercept)