blob: ce6b280203cf24b8a2fd8d15fa196ed2a6a28f09 [file] [log] [blame]
import plpy
import math
import re
from utilities.utilities import _string_to_array
from utilities.utilities import _array_to_string
from convex.utils_regularization import __utils_ind_var_scales
from convex.utils_regularization import __utils_dep_var_scale
from convex.utils_regularization import __utils_normalize_data
from utilities.validate_args import table_exists
from utilities.control import IterationController2S
from collections import namedtuple
# ------------------------------------------------------------------------
# -- constants -----------------------------------------------------------
# below constants are defined in a manner that allow using them as enums:
# 'igd' in OPTIMIZERS (returns True)
# 'igd' == OPTIMIZERS.igd (returns True)
# To change the
BINOMIAL_FAMILIES = namedtuple("bin", ("binomial logistic"))('binomial', 'logistic')
GAUSSIAN_FAMILIES = namedtuple("gau", ("gaussian linear"))('gaussian', 'linear')
OPTIMIZERS = namedtuple("opt", ("igd fista"))('igd', 'fista')
# -------------------------------------------------------------------------
def _process_results(coef, intercept, outstr_array):
"""
Return features, features_selected, dense_coef
"""
if not outstr_array:
raise ValueError("Invalid feature name array: {0}".format(str(outstr_array)))
if not coef:
raise ValueError("Invalid coef array: {0}".format(str(coef)))
features = _array_to_string(outstr_array)
selected_array = []
dense_coef = []
for i in range(len(coef)):
if coef[i] != 0:
selected_array.append(outstr_array[i])
dense_coef.append(coef[i])
features_selected = _array_to_string(selected_array)
dense_coef = _array_to_string(dense_coef)
return (features, features_selected, dense_coef, _array_to_string(coef))
# ------------------------------------------------------------------------
def _process_warmup_lambdas(lambdas, lambda_value):
"""
Convert the string of warmup_lambdas into an double array
@param lambdas The string which will be converted to an array
@param lambda_value The target value of lambda, which must be equal to
the last element of the input array
"""
matched = re.match(r"^[\[\{\(](.*)[\]\}\)]$", lambdas)
if matched is None:
plpy.error("Elastic Net error: warmup_lambdas must be NULL or something like {3,2,1} !")
elm = _string_to_array(matched.group(1))
elm = [float(i) for i in elm]
if elm[- 1] != lambda_value:
plpy.error("""
Elastic Net error: The last element of warmup_lambdas must
be equal to the lambda value that you want to compute !
""")
if len(elm) > 1:
for i in range(len(elm) - 1):
if elm[i] <= elm[i + 1]:
plpy.error("""
Elastic Net error: The given warm-up array must be
in a strict descent order.
""")
return elm
# ------------------------------------------------------------------------
def _generate_warmup_lambda_sequence(lambda_value, n_steps):
"""
Compute lambda sequence, when warmup is True and warmup_lambdas are
not given
"""
if n_steps == 1:
return [lambda_value]
largest = 1e5
if abs(lambda_value - 0.) < 1e-6:
zero_lambda = True
smallest = 0.001 * largest
n_steps -= 1
else:
smallest = lambda_value
zero_lambda = False
smallest, largest = min(smallest, largest), max(smallest, largest)
step = math.log(smallest / largest) / (float(n_steps) - 1)
constant = math.log(largest)
seq = [math.exp(j * step + constant) for j in range(n_steps)]
if zero_lambda:
seq.append(0.)
return seq
# ------------------------------------------------------------------------
def _compute_average_sq(**args):
"""
Compute the average squares of all features, used to estimtae the largest lambda
Actually only the largest value is used, so order does not matter here
"""
sq = [1] * args["dimension"]
if args["normalization"] is False:
for i in range(args["dimension"]):
sq[i] = (args["x_scales"]["std"][i]) ** 2 + (args["x_scales"]["mean"][i]) ** 2
return sq
# ------------------------------------------------------------------------
def _compute_log_likelihood(coef, intercept, **args):
"""
Compute the log-likelihood at the end of calculation
"""
if args["family"] == "gaussian": # linear models
loss = plpy.execute(
"""
select
avg(({col_dep_var_new} - {schema_madlib}.elastic_net_gaussian_predict(
'{coefficients}'::double precision[],
{intercept}::double precision,
{col_ind_var_new}))^2) / 2.
as loss
from
{tbl_used}
""".format(coefficients=_array_to_string(coef),
intercept=intercept,
**args))[0]["loss"]
elif args["family"] == "binomial": # logistic models
loss = plpy.execute(
"""
select
avg({schema_madlib}.__elastic_net_binomial_loglikelihood(
'{coefficients}'::double precision[],
{intercept},
{col_dep_var_new},
{col_ind_var_new}))
as loss
from {tbl_used}
""".format(coefficients=_array_to_string(coef),
intercept=intercept,
**args))[0]["loss"]
module_1 = sum(x * x for x in coef) / 2.
module_2 = sum(abs(x) for x in coef)
log_likelihood = - (loss + args["lambda_value"] *
((1 - args["alpha"]) * module_1 + args["alpha"] * module_2))
return log_likelihood
# ------------------------------------------------------------------------
def _elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var,
tbl_result, tbl_summary, lambda_value, alpha,
normalization, max_iter, tolerance):
if (any(i is None for i in (lambda_value, alpha, normalization)) or
any(not i for i in (tbl_source, col_ind_var, col_dep_var, tbl_result))):
plpy.error("Elastic Net error: You have unsupported NULL/empty value(s) in the arguments!")
if table_exists(tbl_result, only_first_schema=True):
plpy.error("Elastic Net error: Output table " + tbl_result + " already exists!")
if table_exists(tbl_summary, only_first_schema=True):
plpy.error("Elastic Net error: Output summary table " + tbl_summary + " already exists!")
if lambda_value < 0:
plpy.error("Elastic Net error: The regularization parameter lambda cannot be negative!")
if alpha < 0 or alpha > 1:
plpy.error("Elastic Net error: The elastic net control parameter alpha must be in [0,1] !")
if max_iter <= 0:
plpy.error("Elastic Net error: max_iter must be positive!")
if tolerance < 0:
plpy.error("Elastic Net error: tolerance must be positive!")
return None
# ------------------------------------------------------------------------
def _compute_data_scales(args):
args["x_scales"] = __utils_ind_var_scales(tbl_data=args["tbl_source"], col_ind_var=args["col_ind_var"],
dimension=args["dimension"], schema_madlib=args["schema_madlib"])
if args["family"] == "binomial":
args["y_scale"] = dict(mean=0, std=1)
else:
args["y_scale"] = __utils_dep_var_scale(schema_madlib=args["schema_madlib"], tbl_data=args["tbl_source"],
col_ind_var=args["col_ind_var"], col_dep_var=args["col_dep_var"])
args["xmean_str"] = _array_to_string(args["x_scales"]["mean"])
# ------------------------------------------------------------------------
def _normalize_data(args):
"""
Compute the scaling factors for independent and dependent
variables, and then scale the original data.
The output is stored in tbl_data_scaled
"""
_compute_data_scales(args)
y_decenter = True if args["family"] == "gaussian" else False
__utils_normalize_data(y_decenter=y_decenter,
tbl_data=args["tbl_source"],
col_ind_var=args["col_ind_var"],
col_dep_var=args["col_dep_var"],
tbl_data_scaled=args["tbl_data_scaled"],
col_ind_var_norm_new=args["col_ind_var_norm_new"],
col_dep_var_norm_new=args["col_dep_var_norm_new"],
schema_madlib=args["schema_madlib"],
x_mean_str=args["xmean_str"],
x_std_str=_array_to_string(args["x_scales"]["std"]),
y_mean=args["y_scale"]["mean"],
y_std=args["y_scale"]["std"],
grouping_col=args["grouping_col"])
return None
# ------------------------------------------------------------------------
def _tbl_dimension_rownum(schema_madlib, tbl_source, col_ind_var):
"""
Measure the dimension and row number of source data table
"""
# independent variable array length
dimension = plpy.execute("""
select array_upper({col_ind_var},1) as dimension
from {tbl_source} limit 1
""".format(tbl_source=tbl_source,
col_ind_var=col_ind_var))[0]["dimension"]
# total row number of data source table
# The WHERE clause here ignores rows in the table that contain one or more NULLs in the
# independent variable (x). There is no NULL check made for the dependent variable (y),
# since one of the hard requirements/assumptions of the input data to elastic_net is that the
# dependent variable cannot be NULL.
row_num = plpy.execute("""
select count(*) from {tbl_source}
WHERE not {schema_madlib}.array_contains_null({col_ind_var})
""".format(tbl_source=tbl_source,
schema_madlib=schema_madlib,
col_ind_var=col_ind_var))[0]["count"]
return (dimension, row_num)
# ------------------------------------------------------------------------
def _compute_means(**args):
"""
Compute the averages of dependent (y) and independent (x) variables
"""
if args["normalization"]:
xmean_str = _array_to_string([0] * args["dimension"])
ymean = 0
return (xmean_str, ymean)
else:
return (args["xmean_str"], args["y_scale"]["mean"])
# ------------------------------------------------------------------------
class IterationControllerNoTableDrop (IterationController2S):
"""
IterationController but without table dropping
Useful if one wants to use it in cross validation
where dropping tables in a loop would use up all the locks
and get "out of memory" error
"""
# ------------------------------------------------------------------------
def __init__(self, rel_args, rel_state, stateType,
temporaryTables=True,
truncAfterIteration=False,
schema_madlib="MADLIB_SCHEMA_MISSING",
verbose=False,
**kwargs):
# Need to call super class's init method to initialize
# member fields
super(IterationControllerNoTableDrop, self).__init__(
self, rel_args, rel_state, stateType, temporaryTables,
truncAfterIteration, schema_madlib, verbose, **kwargs)
# self.kwargs["rel_state"] = "pg_temp" + rel_state, but for testing
# the existence of a table, schema name should be used together
self.state_exists = plpy.execute(
"select count(*) from information_schema.tables "
"where table_name = '{0}' and table_schema = 'pg_temp'".
format(rel_state))[0]['count'] == 1
# The current total row number of rel_state table
if self.state_exists:
self.state_row_num = plpy.execute("select count(*) from {rel_state}".
format(**self.kwargs))[0]["count"]
# ------------------------------------------------------------------------
def update(self, newState):
"""
Update state of calculation.
"""
newState = newState.format(iteration=self.iteration, **self.kwargs)
self.iteration += 1
if self.state_exists and self.iteration <= self.state_row_num:
# If the rel_state table already exists, and
# iteration number is smaller than total row number,
# use UPDATE instead of append. UPDATE does not use
# extra locks.
self.runSQL("""
update {rel_state} set _state = ({newState})
where _iteration = {iteration}
""".format(iteration=self.iteration,
newState=newState,
**self.kwargs))
else:
# rel_state table is newly created, and
# append data to this table
self.runSQL("""
INSERT INTO {rel_state}
SELECT
{iteration},
({newState})
""".format(iteration=self.iteration,
newState=newState,
**self.kwargs))
# ------------------------------------------------------------------------
def __enter__(self):
"""
__enter__ and __exit__ methods are special. They are automatically called
when using "with" block.
"""
if self.state_exists is False:
# create rel_state table when it does not already exist
super(IterationControllerNoTableDrop, self).__enter__(self)
self.inWith = True
return self
# ------------------------------------------------------------------------
class IterationControllerTableAppend (IterationControllerNoTableDrop):
def __init__(self, rel_args, rel_state, stateType,
temporaryTables=True,
truncAfterIteration=False,
schema_madlib="MADLIB_SCHEMA_MISSING",
verbose=False,
**kwargs):
self.kwargs = kwargs
self.kwargs.update(
rel_args=rel_args,
rel_state=rel_state,
stateType=stateType.format(schema_madlib=schema_madlib),
schema_madlib=schema_madlib)
self.temporaryTables = temporaryTables
self.truncAfterIteration = truncAfterIteration
self.verbose = verbose
self.inWith = False
self.iteration = -1
self.state_exists = plpy.execute("""
select count(*)
from information_schema.tables
where table_name = '{rel_state}'
""".format(**self.kwargs))[0]['count'] == 1
# ------------------------------------------------------------------------
def update(self, newState):
"""
Update state of calculation.
"""
newState = newState.format(iteration=self.iteration, **self.kwargs)
self.iteration += 1
self.runSQL("""
INSERT INTO {rel_state}
SELECT
{iteration},
({newState})
""".format(iteration=self.iteration,
newState=newState,
**self.kwargs))