| import plpy |
| import math |
| import re |
| from utilities.utilities import _string_to_array |
| from utilities.utilities import _array_to_string |
| from convex.utils_regularization import __utils_ind_var_scales |
| from convex.utils_regularization import __utils_dep_var_scale |
| from convex.utils_regularization import __utils_normalize_data |
| from utilities.validate_args import table_exists |
| from utilities.control import IterationController2S |
| |
| from collections import namedtuple |
| |
| # ------------------------------------------------------------------------ |
| # -- constants ----------------------------------------------------------- |
| |
| # below constants are defined in a manner that allow using them as enums: |
| # 'igd' in OPTIMIZERS (returns True) |
| # 'igd' == OPTIMIZERS.igd (returns True) |
| # To change the |
| BINOMIAL_FAMILIES = namedtuple("bin", ("binomial logistic"))('binomial', 'logistic') |
| GAUSSIAN_FAMILIES = namedtuple("gau", ("gaussian linear"))('gaussian', 'linear') |
| OPTIMIZERS = namedtuple("opt", ("igd fista"))('igd', 'fista') |
| # ------------------------------------------------------------------------- |
| |
| |
| def _process_results(coef, intercept, outstr_array): |
| """ |
| Return features, features_selected, dense_coef |
| """ |
| if not outstr_array: |
| raise ValueError("Invalid feature name array: {0}".format(str(outstr_array))) |
| if not coef: |
| raise ValueError("Invalid coef array: {0}".format(str(coef))) |
| |
| features = _array_to_string(outstr_array) |
| selected_array = [] |
| dense_coef = [] |
| for i in range(len(coef)): |
| if coef[i] != 0: |
| selected_array.append(outstr_array[i]) |
| dense_coef.append(coef[i]) |
| features_selected = _array_to_string(selected_array) |
| dense_coef = _array_to_string(dense_coef) |
| |
| return (features, features_selected, dense_coef, _array_to_string(coef)) |
| # ------------------------------------------------------------------------ |
| |
| |
| def _process_warmup_lambdas(lambdas, lambda_value): |
| """ |
| Convert the string of warmup_lambdas into an double array |
| @param lambdas The string which will be converted to an array |
| @param lambda_value The target value of lambda, which must be equal to |
| the last element of the input array |
| """ |
| matched = re.match(r"^[\[\{\(](.*)[\]\}\)]$", lambdas) |
| if matched is None: |
| plpy.error("Elastic Net error: warmup_lambdas must be NULL or something like {3,2,1} !") |
| |
| elm = _string_to_array(matched.group(1)) |
| elm = [float(i) for i in elm] |
| |
| if elm[- 1] != lambda_value: |
| plpy.error(""" |
| Elastic Net error: The last element of warmup_lambdas must |
| be equal to the lambda value that you want to compute ! |
| """) |
| |
| if len(elm) > 1: |
| for i in range(len(elm) - 1): |
| if elm[i] <= elm[i + 1]: |
| plpy.error(""" |
| Elastic Net error: The given warm-up array must be |
| in a strict descent order. |
| """) |
| |
| return elm |
| # ------------------------------------------------------------------------ |
| |
| |
| def _generate_warmup_lambda_sequence(lambda_value, n_steps): |
| """ |
| Compute lambda sequence, when warmup is True and warmup_lambdas are |
| not given |
| """ |
| if n_steps == 1: |
| return [lambda_value] |
| |
| largest = 1e5 |
| |
| if abs(lambda_value - 0.) < 1e-6: |
| zero_lambda = True |
| smallest = 0.001 * largest |
| n_steps -= 1 |
| else: |
| smallest = lambda_value |
| zero_lambda = False |
| |
| smallest, largest = min(smallest, largest), max(smallest, largest) |
| step = math.log(smallest / largest) / (float(n_steps) - 1) |
| constant = math.log(largest) |
| |
| seq = [math.exp(j * step + constant) for j in range(n_steps)] |
| if zero_lambda: |
| seq.append(0.) |
| |
| return seq |
| # ------------------------------------------------------------------------ |
| |
| |
| def _compute_average_sq(**args): |
| """ |
| Compute the average squares of all features, used to estimtae the largest lambda |
| Actually only the largest value is used, so order does not matter here |
| """ |
| sq = [1] * args["dimension"] |
| if args["normalization"] is False: |
| for i in range(args["dimension"]): |
| sq[i] = (args["x_scales"]["std"][i]) ** 2 + (args["x_scales"]["mean"][i]) ** 2 |
| |
| return sq |
| # ------------------------------------------------------------------------ |
| |
| |
| def _compute_log_likelihood(coef, intercept, **args): |
| """ |
| Compute the log-likelihood at the end of calculation |
| """ |
| if args["family"] == "gaussian": # linear models |
| loss = plpy.execute( |
| """ |
| select |
| avg(({col_dep_var_new} - {schema_madlib}.elastic_net_gaussian_predict( |
| '{coefficients}'::double precision[], |
| {intercept}::double precision, |
| {col_ind_var_new}))^2) / 2. |
| as loss |
| from |
| {tbl_used} |
| """.format(coefficients=_array_to_string(coef), |
| intercept=intercept, |
| **args))[0]["loss"] |
| elif args["family"] == "binomial": # logistic models |
| loss = plpy.execute( |
| """ |
| select |
| avg({schema_madlib}.__elastic_net_binomial_loglikelihood( |
| '{coefficients}'::double precision[], |
| {intercept}, |
| {col_dep_var_new}, |
| {col_ind_var_new})) |
| as loss |
| from {tbl_used} |
| """.format(coefficients=_array_to_string(coef), |
| intercept=intercept, |
| **args))[0]["loss"] |
| module_1 = sum(x * x for x in coef) / 2. |
| module_2 = sum(abs(x) for x in coef) |
| |
| log_likelihood = - (loss + args["lambda_value"] * |
| ((1 - args["alpha"]) * module_1 + args["alpha"] * module_2)) |
| return log_likelihood |
| # ------------------------------------------------------------------------ |
| |
| |
| def _elastic_net_validate_args(tbl_source, col_ind_var, col_dep_var, |
| tbl_result, tbl_summary, lambda_value, alpha, |
| normalization, max_iter, tolerance): |
| if (any(i is None for i in (lambda_value, alpha, normalization)) or |
| any(not i for i in (tbl_source, col_ind_var, col_dep_var, tbl_result))): |
| plpy.error("Elastic Net error: You have unsupported NULL/empty value(s) in the arguments!") |
| |
| if table_exists(tbl_result, only_first_schema=True): |
| plpy.error("Elastic Net error: Output table " + tbl_result + " already exists!") |
| |
| if table_exists(tbl_summary, only_first_schema=True): |
| plpy.error("Elastic Net error: Output summary table " + tbl_summary + " already exists!") |
| |
| if lambda_value < 0: |
| plpy.error("Elastic Net error: The regularization parameter lambda cannot be negative!") |
| |
| if alpha < 0 or alpha > 1: |
| plpy.error("Elastic Net error: The elastic net control parameter alpha must be in [0,1] !") |
| |
| if max_iter <= 0: |
| plpy.error("Elastic Net error: max_iter must be positive!") |
| |
| if tolerance < 0: |
| plpy.error("Elastic Net error: tolerance must be positive!") |
| |
| return None |
| # ------------------------------------------------------------------------ |
| |
| |
| def _compute_data_scales(args): |
| args["x_scales"] = __utils_ind_var_scales(tbl_data=args["tbl_source"], col_ind_var=args["col_ind_var"], |
| dimension=args["dimension"], schema_madlib=args["schema_madlib"]) |
| |
| if args["family"] == "binomial": |
| args["y_scale"] = dict(mean=0, std=1) |
| else: |
| args["y_scale"] = __utils_dep_var_scale(schema_madlib=args["schema_madlib"], tbl_data=args["tbl_source"], |
| col_ind_var=args["col_ind_var"], col_dep_var=args["col_dep_var"]) |
| |
| args["xmean_str"] = _array_to_string(args["x_scales"]["mean"]) |
| # ------------------------------------------------------------------------ |
| |
| |
| def _normalize_data(args): |
| """ |
| Compute the scaling factors for independent and dependent |
| variables, and then scale the original data. |
| |
| The output is stored in tbl_data_scaled |
| """ |
| _compute_data_scales(args) |
| |
| y_decenter = True if args["family"] == "gaussian" else False |
| |
| __utils_normalize_data(y_decenter=y_decenter, |
| tbl_data=args["tbl_source"], |
| col_ind_var=args["col_ind_var"], |
| col_dep_var=args["col_dep_var"], |
| tbl_data_scaled=args["tbl_data_scaled"], |
| col_ind_var_norm_new=args["col_ind_var_norm_new"], |
| col_dep_var_norm_new=args["col_dep_var_norm_new"], |
| schema_madlib=args["schema_madlib"], |
| x_mean_str=args["xmean_str"], |
| x_std_str=_array_to_string(args["x_scales"]["std"]), |
| y_mean=args["y_scale"]["mean"], |
| y_std=args["y_scale"]["std"], |
| grouping_col=args["grouping_col"]) |
| |
| return None |
| # ------------------------------------------------------------------------ |
| |
| |
| def _tbl_dimension_rownum(schema_madlib, tbl_source, col_ind_var): |
| """ |
| Measure the dimension and row number of source data table |
| """ |
| # independent variable array length |
| dimension = plpy.execute(""" |
| select array_upper({col_ind_var},1) as dimension |
| from {tbl_source} limit 1 |
| """.format(tbl_source=tbl_source, |
| col_ind_var=col_ind_var))[0]["dimension"] |
| # total row number of data source table |
| # The WHERE clause here ignores rows in the table that contain one or more NULLs in the |
| # independent variable (x). There is no NULL check made for the dependent variable (y), |
| # since one of the hard requirements/assumptions of the input data to elastic_net is that the |
| # dependent variable cannot be NULL. |
| row_num = plpy.execute(""" |
| select count(*) from {tbl_source} |
| WHERE not {schema_madlib}.array_contains_null({col_ind_var}) |
| """.format(tbl_source=tbl_source, |
| schema_madlib=schema_madlib, |
| col_ind_var=col_ind_var))[0]["count"] |
| |
| return (dimension, row_num) |
| # ------------------------------------------------------------------------ |
| |
| |
| def _compute_means(**args): |
| """ |
| Compute the averages of dependent (y) and independent (x) variables |
| """ |
| if args["normalization"]: |
| xmean_str = _array_to_string([0] * args["dimension"]) |
| ymean = 0 |
| return (xmean_str, ymean) |
| else: |
| return (args["xmean_str"], args["y_scale"]["mean"]) |
| # ------------------------------------------------------------------------ |
| |
| |
| class IterationControllerNoTableDrop (IterationController2S): |
| |
| """ |
| IterationController but without table dropping |
| |
| Useful if one wants to use it in cross validation |
| where dropping tables in a loop would use up all the locks |
| and get "out of memory" error |
| """ |
| # ------------------------------------------------------------------------ |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables=True, |
| truncAfterIteration=False, |
| schema_madlib="MADLIB_SCHEMA_MISSING", |
| verbose=False, |
| **kwargs): |
| # Need to call super class's init method to initialize |
| # member fields |
| super(IterationControllerNoTableDrop, self).__init__( |
| self, rel_args, rel_state, stateType, temporaryTables, |
| truncAfterIteration, schema_madlib, verbose, **kwargs) |
| # self.kwargs["rel_state"] = "pg_temp" + rel_state, but for testing |
| # the existence of a table, schema name should be used together |
| self.state_exists = plpy.execute( |
| "select count(*) from information_schema.tables " |
| "where table_name = '{0}' and table_schema = 'pg_temp'". |
| format(rel_state))[0]['count'] == 1 |
| # The current total row number of rel_state table |
| if self.state_exists: |
| self.state_row_num = plpy.execute("select count(*) from {rel_state}". |
| format(**self.kwargs))[0]["count"] |
| |
| # ------------------------------------------------------------------------ |
| |
| def update(self, newState): |
| """ |
| Update state of calculation. |
| """ |
| newState = newState.format(iteration=self.iteration, **self.kwargs) |
| self.iteration += 1 |
| if self.state_exists and self.iteration <= self.state_row_num: |
| # If the rel_state table already exists, and |
| # iteration number is smaller than total row number, |
| # use UPDATE instead of append. UPDATE does not use |
| # extra locks. |
| self.runSQL(""" |
| update {rel_state} set _state = ({newState}) |
| where _iteration = {iteration} |
| """.format(iteration=self.iteration, |
| newState=newState, |
| **self.kwargs)) |
| else: |
| # rel_state table is newly created, and |
| # append data to this table |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format(iteration=self.iteration, |
| newState=newState, |
| **self.kwargs)) |
| # ------------------------------------------------------------------------ |
| |
| def __enter__(self): |
| """ |
| __enter__ and __exit__ methods are special. They are automatically called |
| when using "with" block. |
| """ |
| if self.state_exists is False: |
| # create rel_state table when it does not already exist |
| super(IterationControllerNoTableDrop, self).__enter__(self) |
| self.inWith = True |
| return self |
| # ------------------------------------------------------------------------ |
| |
| |
| class IterationControllerTableAppend (IterationControllerNoTableDrop): |
| |
| def __init__(self, rel_args, rel_state, stateType, |
| temporaryTables=True, |
| truncAfterIteration=False, |
| schema_madlib="MADLIB_SCHEMA_MISSING", |
| verbose=False, |
| **kwargs): |
| self.kwargs = kwargs |
| self.kwargs.update( |
| rel_args=rel_args, |
| rel_state=rel_state, |
| stateType=stateType.format(schema_madlib=schema_madlib), |
| schema_madlib=schema_madlib) |
| self.temporaryTables = temporaryTables |
| self.truncAfterIteration = truncAfterIteration |
| self.verbose = verbose |
| self.inWith = False |
| self.iteration = -1 |
| |
| self.state_exists = plpy.execute(""" |
| select count(*) |
| from information_schema.tables |
| where table_name = '{rel_state}' |
| """.format(**self.kwargs))[0]['count'] == 1 |
| # ------------------------------------------------------------------------ |
| |
| def update(self, newState): |
| """ |
| Update state of calculation. |
| """ |
| newState = newState.format(iteration=self.iteration, **self.kwargs) |
| self.iteration += 1 |
| self.runSQL(""" |
| INSERT INTO {rel_state} |
| SELECT |
| {iteration}, |
| ({newState}) |
| """.format(iteration=self.iteration, |
| newState=newState, |
| **self.kwargs)) |