| |
| import plpy |
| from utilities.utilities import unique_string |
| from utilities.in_mem_group_control import GroupIterationController |
| from elastic_net_utils import _compute_means |
| from elastic_net_utils import _normalize_data |
| from elastic_net_utils import _compute_data_scales |
| from elastic_net_utils import _tbl_dimension_rownum |
| from elastic_net_utils import _elastic_net_validate_args |
| from utilities.utilities import _array_to_string |
| from elastic_net_utils import _compute_average_sq |
| from elastic_net_utils import _generate_warmup_lambda_sequence |
| from elastic_net_utils import _process_warmup_lambdas |
| from elastic_net_generate_result import _elastic_net_generate_result |
| from utilities.utilities import extract_keyvalue_params |
| from utilities.control import MinWarning |
| |
| # ------------------------------------------------------------------------ |
| |
| |
| def _igd_params_parser(optimizer_params, lambda_value, tolerance, schema_madlib): |
| """ |
| Parse IGD parameters. |
| """ |
| # default values |
| defaults_and_types = { |
| "stepsize": (0.01, float), |
| "warmup": (False, bool), |
| "warmup_lambdas": (None, list), |
| "warmup_lambda_no": (15, int), |
| "threshold": (1e-10, float), |
| "parallel": (True, bool), |
| "step_decay": (0.0, float), |
| "warmup_tolerance": (tolerance, float) |
| } |
| param_defaults = dict([(k, v[0]) for k, v in defaults_and_types.items()]) |
| param_types = dict([(k, v[1]) for k, v in defaults_and_types.items()]) |
| |
| if not optimizer_params: |
| return param_defaults |
| |
| usage_str = ("\n Run:\n" |
| " SELECT {0}.elastic_net_train('igd');\n" |
| " to see the parameters for FISTA algorithm.". |
| format(schema_madlib)) |
| name_value = extract_keyvalue_params(optimizer_params, param_types, |
| param_defaults, |
| usage_str=usage_str, |
| ignore_invalid=True) |
| |
| if name_value["warmup"] and name_value['warmup_lambdas'] is not None: |
| # errors are handled in _process_warmup_lambdas |
| name_value['warmup_lambdas'] = _process_warmup_lambdas(name_value['warmup_lambdas'], lambda_value) |
| |
| # validate the parameters |
| if name_value["step_decay"] < 0: |
| plpy.error("Elastic Net error: step_decay must be non-negative!") |
| |
| if name_value["stepsize"] <= 0: |
| plpy.error("Elastic Net error: step size must be positive!") |
| |
| if name_value["warmup_tolerance"] <= 0: |
| plpy.error("Elastic Net error: warmup_tolerance must be positive!") |
| |
| if (name_value["warmup"] and name_value["warmup_lambdas"] is None and |
| name_value["warmup_lambda_no"] < 1): |
| plpy.error("Elastic Net error: Number of warm-up lambdas must be a positive integer!") |
| |
| if name_value["threshold"] < 0: |
| plpy.error("Elastic Net error: A positive threshold is needed to screen out tiny values around zero!") |
| |
| return name_value |
| # ------------------------------------------------------------------------ |
| |
| |
| def _igd_construct_dict(schema_madlib, family, tbl_source, |
| col_ind_var, col_dep_var, |
| tbl_result, dimension, row_num, lambda_value, alpha, |
| normalization, max_iter, tolerance, outstr_array, |
| optimizer_params_dict): |
| """ |
| Construct the dict used by a series of SQL queries in IGD optimizer. |
| """ |
| args = dict(schema_madlib=schema_madlib, |
| family=family, |
| tbl_source=tbl_source, |
| tbl_data=tbl_source, # argument name used in normalization |
| col_ind_var=col_ind_var, col_dep_var=col_dep_var, |
| col_ind_var_norm_new=unique_string(desp='temp_features_norm'), # for normalization usage |
| col_ind_var_tmp=unique_string(desp='temp_features'), |
| col_dep_var_norm_new=unique_string(desp='temp_target_norm'), # for normalization usage |
| col_dep_var_tmp=unique_string(desp='temp_target'), |
| tbl_result=tbl_result, |
| lambda_value=lambda_value, alpha=alpha, |
| dimension=dimension, row_num=row_num, |
| max_iter=max_iter, tolerance=tolerance, |
| outstr_array=outstr_array, |
| normalization=normalization) |
| |
| # Add the optimizer parameters |
| args.update(optimizer_params_dict) |
| |
| # Table names useful when normalizing the original data |
| # Note: in order to be consistent with the calling convention |
| # of the normalization functions, multiple elements of the dict |
| # actually have the same value. This is a price that one has to pay |
| # if he wants to save typing argument names by using **args as the |
| # function argument. |
| tbl_ind_scales = unique_string(desp='temp_ind_scales') |
| tbl_dep_scale = unique_string(desp='temp_dep_scales') |
| tbl_data_scaled = unique_string(desp='temp_data_scales') |
| args.update(tbl_dep_scale=tbl_dep_scale, |
| tbl_ind_scales=tbl_ind_scales, |
| tbl_data_scaled=tbl_data_scaled) |
| |
| # Table names used in IGD iterations |
| args.update(tbl_igd_state=unique_string(), |
| tbl_igd_args=unique_string()) |
| |
| # more, for args table |
| args["dimension_name"] = unique_string() |
| args["stepsize_name"] = unique_string() |
| args["lambda_name"] = unique_string() |
| args["alpha_name"] = unique_string() |
| args["total_rows_name"] = unique_string() |
| args["max_iter_name"] = unique_string() |
| args["tolerance_name"] = unique_string() |
| args["xmean_name"] = unique_string() |
| args["ymean_name"] = unique_string() |
| |
| return args |
| # ------------------------------------------------------------------------ |
| |
| |
| def _igd_cleanup_temp_tbls(**args): |
| """ |
| Drop all temporary tables used by IGD optimizer, |
| including tables used in the possible normalization |
| and IGD iterations. |
| """ |
| plpy.execute(""" |
| drop table if exists {tbl_ind_scales}; |
| drop table if exists {tbl_dep_scale}; |
| drop table if exists {tbl_data_scaled}; |
| drop table if exists {tbl_igd_args}; |
| drop table if exists pg_temp.{tbl_igd_state}; |
| drop table if exists {x_mean_table}; |
| drop table if exists {y_mean_table}; |
| """.format(**args)) |
| return None |
| # ------------------------------------------------------------------------ |
| |
| |
| def _elastic_net_igd_train(schema_madlib, func_step_aggregate, |
| func_state_diff, family, |
| tbl_source, col_ind_var, |
| col_dep_var, tbl_result, tbl_summary, lambda_value, alpha, |
| normalization, optimizer_params, max_iter, |
| tolerance, outstr_array, grouping_str, |
| grouping_col, **kwargs): |
| _elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var, tbl_result, tbl_summary, |
| lambda_value, alpha, normalization, max_iter, tolerance) |
| |
| return _elastic_net_igd_train_compute(schema_madlib, func_step_aggregate, |
| func_state_diff, family, |
| tbl_source, col_ind_var, |
| col_dep_var, tbl_result, tbl_summary, lambda_value, alpha, |
| normalization, optimizer_params, max_iter, |
| tolerance, outstr_array, grouping_str, |
| grouping_col, **kwargs) |
| # ------------------------------------------------------------------------ |
| |
| |
| def _elastic_net_igd_train_compute(schema_madlib, func_step_aggregate, |
| func_state_diff, family, |
| tbl_source, col_ind_var, |
| col_dep_var, tbl_result, tbl_summary, lambda_value, alpha, |
| normalization, optimizer_params, max_iter, |
| tolerance, outstr_array, grouping_str, |
| grouping_col, **kwargs): |
| """ |
| Fit linear model with elastic net regularization using IGD optimization. |
| |
| @param tbl_source Name of data source table |
| @param col_ind_var Name of independent variable column, |
| independent variable is an array |
| @param col_dep_var Name of dependent variable column |
| @param tbl_result Name of the table to store the results, |
| will return fitting coefficients and |
| likelihood |
| @param lambda_value The regularization parameter |
| @param alpha The elastic net parameter, [0, 1] |
| @param normalization Whether to normalize the variables |
| @param optimizer_params Parameters of the above optimizer, the format |
| is '{arg = value, ...}'::varchar[] |
| """ |
| with MinWarning('error'): |
| (dimension, row_num) = _tbl_dimension_rownum(schema_madlib, |
| tbl_source, col_ind_var) |
| |
| # generate a full dict to ease the following string format |
| # including several temporary table names |
| args = _igd_construct_dict(schema_madlib, family, tbl_source, |
| col_ind_var, col_dep_var, tbl_result, dimension, row_num, |
| lambda_value, alpha, normalization, max_iter, tolerance, |
| outstr_array, _igd_params_parser(optimizer_params, lambda_value, |
| tolerance, schema_madlib)) |
| |
| args.update({'x_mean_table':unique_string(desp='x_mean_table')}) |
| args.update({'y_mean_table':unique_string(desp='y_mean_table')}) |
| args.update({'grouping_col': grouping_col}) |
| # use normalized data or not |
| if normalization: |
| _normalize_data(args) |
| tbl_used = args["tbl_data_scaled"] |
| args["col_ind_var_new"] = args["col_ind_var_norm_new"] |
| args["col_dep_var_new"] = args["col_dep_var_norm_new"] |
| else: |
| _compute_data_scales(args) |
| tbl_used = tbl_source |
| args["col_ind_var_new"] = col_ind_var |
| args["col_dep_var_new"] = col_dep_var |
| args["tbl_used"] = tbl_used |
| |
| # parameter values required by the IGD optimizer |
| (xmean, ymean) = _compute_means(args) |
| # average squares of each feature |
| # used to estimate the largest lambda value |
| # also used to screen out tiny values, so order is needed |
| args["sq"] = _compute_average_sq(**args) |
| args["sq_str"] = _array_to_string(args["sq"]) |
| |
| if args["warmup_lambdas"] is not None: |
| args["warm_no"] = len(args["warmup_lambdas"]) |
| |
| if args["warmup"] and args["warmup_lambdas"] is None: |
| args["warmup_lambdas"] = \ |
| _generate_warmup_lambda_sequence(lambda_value, |
| args["warmup_lambda_no"]) |
| args["warm_no"] = len(args["warmup_lambdas"]) |
| elif args["warmup"] is False: |
| args["warm_no"] = 1 |
| args["warmup_lambdas"] = [lambda_value] # only one value |
| |
| args.update({ |
| 'rel_args': args["tbl_igd_args"], |
| 'rel_state': args["tbl_igd_state"], |
| 'col_grp_iteration': unique_string(desp='col_grp_iteration'), |
| 'col_grp_state': unique_string(desp='col_grp_state'), |
| 'col_grp_key': unique_string(desp='col_grp_key'), |
| 'col_n_tuples': unique_string(desp='col_n_tuples'), |
| 'lambda_count': 1, |
| 'state_type': "double precision[]", |
| 'rel_source': tbl_used, |
| 'grouping_str': grouping_str, |
| 'xmean_val': xmean, |
| 'ymean_val': ymean, |
| 'tbl_source': tbl_source, |
| 'tbl_summary': tbl_summary |
| }) |
| if not args.get('parallel'): |
| func_step_aggregate += "_single_seg" |
| # perform the actual calculation |
| iteration_run = _compute_igd( |
| schema_madlib, |
| func_step_aggregate, |
| func_state_diff, |
| args["tbl_igd_args"], |
| args["tbl_igd_state"], |
| tbl_used, |
| args["col_ind_var_new"], |
| args["col_dep_var_new"], |
| grouping_str, |
| grouping_col, |
| start_iter=0, |
| max_iter=args["max_iter"], |
| tolerance=args["tolerance"], |
| warmup_tolerance=args["warmup_tolerance"], |
| warm_no=args["warm_no"], |
| step_decay=args["step_decay"], |
| dimension=args["dimension"], |
| stepsize=args["stepsize"], |
| lambda_name=args["warmup_lambdas"], |
| warmup_lambda_value=args.get('warmup_lambdas')[args["lambda_count"]-1], |
| alpha=args["alpha"], |
| row_num=args["row_num"], |
| xmean_val=args["xmean_val"], |
| ymean_val=args["ymean_val"], |
| lambda_count=args["lambda_count"], |
| rel_state=args["tbl_igd_state"], |
| col_grp_iteration=args["col_grp_iteration"], |
| col_grp_state=args["col_grp_state"], |
| col_grp_key=args["col_grp_key"], |
| col_n_tuples=args["col_n_tuples"], |
| rel_source=args["rel_source"], |
| state_type=args["state_type"]) |
| |
| _elastic_net_generate_result("igd", iteration_run, **args) |
| |
| # cleanup |
| _igd_cleanup_temp_tbls(**args) |
| return None |
| |
| # ------------------------------------------------------------------------ |
| |
| |
| def _compute_igd(schema_madlib, func_step_aggregate, func_state_diff, |
| tbl_args, tbl_state, tbl_source, |
| col_ind_var, col_dep_var, grouping_str, grouping_col, |
| start_iter, **kwargs): |
| """ |
| Driver function for elastic net with Gaussian response using IGD |
| |
| @param schema_madlib Name of the MADlib schema, properly escaped/quoted |
| @param tbl_args Name of the (temporary) table containing all non-template |
| arguments |
| @param tbl_state Name of the (temporary) table containing the inter-iteration |
| states |
| @param rel_source Name of the relation containing input points |
| @param col_ind_var Name of the independent variables column |
| @param col_dep_var Name of the dependent variable column |
| @param kwargs We allow the caller to specify additional arguments (all of |
| which will be ignored though). The purpose of this is to allow the |
| caller to unpack a dictionary whose element set is a superset of |
| the required arguments by this function. |
| |
| @return The iteration number (i.e., the key) with which to look up the |
| result in \c tbl_state |
| """ |
| args = locals() |
| |
| for k, v in kwargs.iteritems(): |
| if k not in args: |
| args.update({k: v}) |
| iterationCtrl = GroupIterationController(args) |
| with iterationCtrl as it: |
| it.iteration = start_iter |
| while True: |
| # manually add the intercept term |
| if (it.kwargs["lambda_count"] > len(args.get('lambda_name'))): |
| break |
| it.kwargs["warmup_lambda_value"] = args.get('lambda_name')[it.kwargs["lambda_count"] - 1] |
| # Fix for JIRA MADLIB-1092 |
| # 'col_n_tuples' is supposed to refer to the number of rows in the |
| # table, or the number of rows in a group. col_n_tuples gets |
| # the right value in in_mem_group_control, so using this instead |
| # of row_num (which was used hitherto). |
| it.update(""" |
| {schema_madlib}.{func_step_aggregate}( |
| ({col_ind_var})::double precision[], |
| ({col_dep_var}), |
| {rel_state}.{col_grp_state}, |
| ({warmup_lambda_value})::double precision, |
| ({alpha})::double precision, |
| ({dimension})::integer, |
| ({stepsize})::double precision, |
| ({col_n_tuples})::integer, |
| ('{xmean_val}')::double precision[], |
| ({ymean_val})::double precision, |
| ({step_decay})::double precision |
| ) |
| """) |
| if it.kwargs["lambda_count"] < it.kwargs["warm_no"]: |
| it.kwargs["use_tolerance"] = it.kwargs["warmup_tolerance"] |
| else: |
| it.kwargs["use_tolerance"] = it.kwargs["tolerance"] |
| |
| if it.test(""" |
| {iteration} > {max_iter} or |
| {schema_madlib}.{func_state_diff}( |
| _state_previous, _state_current) < {use_tolerance} |
| """): |
| if (it.iteration < it.kwargs["max_iter"] and |
| it.kwargs["lambda_count"] < it.kwargs["warm_no"]): |
| it.kwargs["lambda_count"] += 1 |
| else: |
| break |
| |
| it.final() |
| if it.kwargs["lambda_count"] < it.kwargs["warm_no"]: |
| plpy.error(""" |
| Elastic Net error: The final target lambda value is not |
| reached in warm-up iterations. You need more iterations! |
| """) |
| return iterationCtrl.iteration |