blob: d73a754239a54ddbef3c886e9f7444fa3b5e720a [file] [log] [blame]
import plpy
from utilities.utilities import unique_string
from utilities.in_mem_group_control import GroupIterationController
from elastic_net_utils import _compute_means
from elastic_net_utils import _normalize_data
from elastic_net_utils import _compute_data_scales
from elastic_net_utils import _tbl_dimension_rownum
from elastic_net_utils import _elastic_net_validate_args
from utilities.utilities import _array_to_string
from elastic_net_utils import _compute_average_sq
from elastic_net_utils import _generate_warmup_lambda_sequence
from elastic_net_utils import _process_warmup_lambdas
from elastic_net_generate_result import _elastic_net_generate_result
from utilities.utilities import extract_keyvalue_params
from utilities.control import MinWarning
# ------------------------------------------------------------------------
def _igd_params_parser(optimizer_params, lambda_value, tolerance, schema_madlib):
"""
Parse IGD parameters.
"""
# default values
defaults_and_types = {
"stepsize": (0.01, float),
"warmup": (False, bool),
"warmup_lambdas": (None, list),
"warmup_lambda_no": (15, int),
"threshold": (1e-10, float),
"parallel": (True, bool),
"step_decay": (0.0, float),
"warmup_tolerance": (tolerance, float)
}
param_defaults = dict([(k, v[0]) for k, v in defaults_and_types.items()])
param_types = dict([(k, v[1]) for k, v in defaults_and_types.items()])
if not optimizer_params:
return param_defaults
usage_str = ("\n Run:\n"
" SELECT {0}.elastic_net_train('igd');\n"
" to see the parameters for FISTA algorithm.".
format(schema_madlib))
name_value = extract_keyvalue_params(optimizer_params, param_types,
param_defaults,
usage_str=usage_str,
ignore_invalid=True)
if name_value["warmup"] and name_value['warmup_lambdas'] is not None:
# errors are handled in _process_warmup_lambdas
name_value['warmup_lambdas'] = _process_warmup_lambdas(name_value['warmup_lambdas'], lambda_value)
# validate the parameters
if name_value["step_decay"] < 0:
plpy.error("Elastic Net error: step_decay must be non-negative!")
if name_value["stepsize"] <= 0:
plpy.error("Elastic Net error: step size must be positive!")
if name_value["warmup_tolerance"] <= 0:
plpy.error("Elastic Net error: warmup_tolerance must be positive!")
if (name_value["warmup"] and name_value["warmup_lambdas"] is None and
name_value["warmup_lambda_no"] < 1):
plpy.error("Elastic Net error: Number of warm-up lambdas must be a positive integer!")
if name_value["threshold"] < 0:
plpy.error("Elastic Net error: A positive threshold is needed to screen out tiny values around zero!")
return name_value
# ------------------------------------------------------------------------
def _igd_construct_dict(schema_madlib, family, tbl_source,
col_ind_var, col_dep_var,
tbl_result, dimension, row_num, lambda_value, alpha,
normalization, max_iter, tolerance, outstr_array,
optimizer_params_dict):
"""
Construct the dict used by a series of SQL queries in IGD optimizer.
"""
args = dict(schema_madlib=schema_madlib,
family=family,
tbl_source=tbl_source,
tbl_data=tbl_source, # argument name used in normalization
col_ind_var=col_ind_var, col_dep_var=col_dep_var,
col_ind_var_norm_new=unique_string(desp='temp_features_norm'), # for normalization usage
col_ind_var_tmp=unique_string(desp='temp_features'),
col_dep_var_norm_new=unique_string(desp='temp_target_norm'), # for normalization usage
col_dep_var_tmp=unique_string(desp='temp_target'),
tbl_result=tbl_result,
lambda_value=lambda_value, alpha=alpha,
dimension=dimension, row_num=row_num,
max_iter=max_iter, tolerance=tolerance,
outstr_array=outstr_array,
normalization=normalization)
# Add the optimizer parameters
args.update(optimizer_params_dict)
# Table names useful when normalizing the original data
# Note: in order to be consistent with the calling convention
# of the normalization functions, multiple elements of the dict
# actually have the same value. This is a price that one has to pay
# if he wants to save typing argument names by using **args as the
# function argument.
tbl_ind_scales = unique_string(desp='temp_ind_scales')
tbl_dep_scale = unique_string(desp='temp_dep_scales')
tbl_data_scaled = unique_string(desp='temp_data_scales')
args.update(tbl_dep_scale=tbl_dep_scale,
tbl_ind_scales=tbl_ind_scales,
tbl_data_scaled=tbl_data_scaled)
# Table names used in IGD iterations
args.update(tbl_igd_state=unique_string(),
tbl_igd_args=unique_string())
# more, for args table
args["dimension_name"] = unique_string()
args["stepsize_name"] = unique_string()
args["lambda_name"] = unique_string()
args["alpha_name"] = unique_string()
args["total_rows_name"] = unique_string()
args["max_iter_name"] = unique_string()
args["tolerance_name"] = unique_string()
args["xmean_name"] = unique_string()
args["ymean_name"] = unique_string()
return args
# ------------------------------------------------------------------------
def _igd_cleanup_temp_tbls(**args):
"""
Drop all temporary tables used by IGD optimizer,
including tables used in the possible normalization
and IGD iterations.
"""
plpy.execute("""
drop table if exists {tbl_ind_scales};
drop table if exists {tbl_dep_scale};
drop table if exists {tbl_data_scaled};
drop table if exists {tbl_igd_args};
drop table if exists pg_temp.{tbl_igd_state};
drop table if exists {x_mean_table};
drop table if exists {y_mean_table};
""".format(**args))
return None
# ------------------------------------------------------------------------
def _elastic_net_igd_train(schema_madlib, func_step_aggregate,
func_state_diff, family,
tbl_source, col_ind_var,
col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
normalization, optimizer_params, max_iter,
tolerance, outstr_array, grouping_str,
grouping_col, **kwargs):
_elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var, tbl_result, tbl_summary,
lambda_value, alpha, normalization, max_iter, tolerance)
return _elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
func_state_diff, family,
tbl_source, col_ind_var,
col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
normalization, optimizer_params, max_iter,
tolerance, outstr_array, grouping_str,
grouping_col, **kwargs)
# ------------------------------------------------------------------------
def _elastic_net_igd_train_compute(schema_madlib, func_step_aggregate,
func_state_diff, family,
tbl_source, col_ind_var,
col_dep_var, tbl_result, tbl_summary, lambda_value, alpha,
normalization, optimizer_params, max_iter,
tolerance, outstr_array, grouping_str,
grouping_col, **kwargs):
"""
Fit linear model with elastic net regularization using IGD optimization.
@param tbl_source Name of data source table
@param col_ind_var Name of independent variable column,
independent variable is an array
@param col_dep_var Name of dependent variable column
@param tbl_result Name of the table to store the results,
will return fitting coefficients and
likelihood
@param lambda_value The regularization parameter
@param alpha The elastic net parameter, [0, 1]
@param normalization Whether to normalize the variables
@param optimizer_params Parameters of the above optimizer, the format
is '{arg = value, ...}'::varchar[]
"""
with MinWarning('error'):
(dimension, row_num) = _tbl_dimension_rownum(schema_madlib,
tbl_source, col_ind_var)
# generate a full dict to ease the following string format
# including several temporary table names
args = _igd_construct_dict(schema_madlib, family, tbl_source,
col_ind_var, col_dep_var, tbl_result, dimension, row_num,
lambda_value, alpha, normalization, max_iter, tolerance,
outstr_array, _igd_params_parser(optimizer_params, lambda_value,
tolerance, schema_madlib))
args.update({'x_mean_table':unique_string(desp='x_mean_table')})
args.update({'y_mean_table':unique_string(desp='y_mean_table')})
args.update({'grouping_col': grouping_col})
# use normalized data or not
if normalization:
_normalize_data(args)
tbl_used = args["tbl_data_scaled"]
args["col_ind_var_new"] = args["col_ind_var_norm_new"]
args["col_dep_var_new"] = args["col_dep_var_norm_new"]
else:
_compute_data_scales(args)
tbl_used = tbl_source
args["col_ind_var_new"] = col_ind_var
args["col_dep_var_new"] = col_dep_var
args["tbl_used"] = tbl_used
# parameter values required by the IGD optimizer
(xmean, ymean) = _compute_means(args)
# average squares of each feature
# used to estimate the largest lambda value
# also used to screen out tiny values, so order is needed
args["sq"] = _compute_average_sq(**args)
args["sq_str"] = _array_to_string(args["sq"])
if args["warmup_lambdas"] is not None:
args["warm_no"] = len(args["warmup_lambdas"])
if args["warmup"] and args["warmup_lambdas"] is None:
args["warmup_lambdas"] = \
_generate_warmup_lambda_sequence(lambda_value,
args["warmup_lambda_no"])
args["warm_no"] = len(args["warmup_lambdas"])
elif args["warmup"] is False:
args["warm_no"] = 1
args["warmup_lambdas"] = [lambda_value] # only one value
args.update({
'rel_args': args["tbl_igd_args"],
'rel_state': args["tbl_igd_state"],
'col_grp_iteration': unique_string(desp='col_grp_iteration'),
'col_grp_state': unique_string(desp='col_grp_state'),
'col_grp_key': unique_string(desp='col_grp_key'),
'col_n_tuples': unique_string(desp='col_n_tuples'),
'lambda_count': 1,
'state_type': "double precision[]",
'rel_source': tbl_used,
'grouping_str': grouping_str,
'xmean_val': xmean,
'ymean_val': ymean,
'tbl_source': tbl_source,
'tbl_summary': tbl_summary
})
if not args.get('parallel'):
func_step_aggregate += "_single_seg"
# perform the actual calculation
iteration_run = _compute_igd(
schema_madlib,
func_step_aggregate,
func_state_diff,
args["tbl_igd_args"],
args["tbl_igd_state"],
tbl_used,
args["col_ind_var_new"],
args["col_dep_var_new"],
grouping_str,
grouping_col,
start_iter=0,
max_iter=args["max_iter"],
tolerance=args["tolerance"],
warmup_tolerance=args["warmup_tolerance"],
warm_no=args["warm_no"],
step_decay=args["step_decay"],
dimension=args["dimension"],
stepsize=args["stepsize"],
lambda_name=args["warmup_lambdas"],
warmup_lambda_value=args.get('warmup_lambdas')[args["lambda_count"]-1],
alpha=args["alpha"],
row_num=args["row_num"],
xmean_val=args["xmean_val"],
ymean_val=args["ymean_val"],
lambda_count=args["lambda_count"],
rel_state=args["tbl_igd_state"],
col_grp_iteration=args["col_grp_iteration"],
col_grp_state=args["col_grp_state"],
col_grp_key=args["col_grp_key"],
col_n_tuples=args["col_n_tuples"],
rel_source=args["rel_source"],
state_type=args["state_type"])
_elastic_net_generate_result("igd", iteration_run, **args)
# cleanup
_igd_cleanup_temp_tbls(**args)
return None
# ------------------------------------------------------------------------
def _compute_igd(schema_madlib, func_step_aggregate, func_state_diff,
tbl_args, tbl_state, tbl_source,
col_ind_var, col_dep_var, grouping_str, grouping_col,
start_iter, **kwargs):
"""
Driver function for elastic net with Gaussian response using IGD
@param schema_madlib Name of the MADlib schema, properly escaped/quoted
@param tbl_args Name of the (temporary) table containing all non-template
arguments
@param tbl_state Name of the (temporary) table containing the inter-iteration
states
@param rel_source Name of the relation containing input points
@param col_ind_var Name of the independent variables column
@param col_dep_var Name of the dependent variable column
@param kwargs We allow the caller to specify additional arguments (all of
which will be ignored though). The purpose of this is to allow the
caller to unpack a dictionary whose element set is a superset of
the required arguments by this function.
@return The iteration number (i.e., the key) with which to look up the
result in \c tbl_state
"""
args = locals()
for k, v in kwargs.iteritems():
if k not in args:
args.update({k: v})
iterationCtrl = GroupIterationController(args)
with iterationCtrl as it:
it.iteration = start_iter
while True:
# manually add the intercept term
if (it.kwargs["lambda_count"] > len(args.get('lambda_name'))):
break
it.kwargs["warmup_lambda_value"] = args.get('lambda_name')[it.kwargs["lambda_count"] - 1]
# Fix for JIRA MADLIB-1092
# 'col_n_tuples' is supposed to refer to the number of rows in the
# table, or the number of rows in a group. col_n_tuples gets
# the right value in in_mem_group_control, so using this instead
# of row_num (which was used hitherto).
it.update("""
{schema_madlib}.{func_step_aggregate}(
({col_ind_var})::double precision[],
({col_dep_var}),
{rel_state}.{col_grp_state},
({warmup_lambda_value})::double precision,
({alpha})::double precision,
({dimension})::integer,
({stepsize})::double precision,
({col_n_tuples})::integer,
('{xmean_val}')::double precision[],
({ymean_val})::double precision,
({step_decay})::double precision
)
""")
if it.kwargs["lambda_count"] < it.kwargs["warm_no"]:
it.kwargs["use_tolerance"] = it.kwargs["warmup_tolerance"]
else:
it.kwargs["use_tolerance"] = it.kwargs["tolerance"]
if it.test("""
{iteration} > {max_iter} or
{schema_madlib}.{func_state_diff}(
_state_previous, _state_current) < {use_tolerance}
"""):
if (it.iteration < it.kwargs["max_iter"] and
it.kwargs["lambda_count"] < it.kwargs["warm_no"]):
it.kwargs["lambda_count"] += 1
else:
break
it.final()
if it.kwargs["lambda_count"] < it.kwargs["warm_no"]:
plpy.error("""
Elastic Net error: The final target lambda value is not
reached in warm-up iterations. You need more iterations!
""")
return iterationCtrl.iteration