| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| """ |
| @file mlp_igd.py_in |
| |
| @brief Multilayer perceptron using IGD: Driver functions |
| |
| @namespace mlp_igd |
| """ |
| import math |
| import plpy |
| |
| from convex.utils_regularization import utils_ind_var_scales |
| from convex.utils_regularization import utils_ind_var_scales_grouping |
| from convex.utils_regularization import __utils_normalize_data |
| from convex.utils_regularization import __utils_normalize_data_grouping |
| |
| from internal.db_utils import get_distinct_col_levels |
| from internal.db_utils import get_one_hot_encoded_expr |
| from internal.db_utils import quote_literal |
| from utilities.control import MinWarning |
| from utilities.in_mem_group_control import GroupIterationController |
| from utilities.utilities import _array_to_string |
| from utilities.utilities import _assert |
| from utilities.utilities import _assert_equal |
| from utilities.utilities import _string_to_array_with_quotes |
| from utilities.utilities import add_postfix |
| from utilities.utilities import extract_keyvalue_params |
| from utilities.utilities import get_grouping_col_str |
| from utilities.utilities import is_valid_psql_type |
| from utilities.utilities import NUMERIC, INTEGER, TEXT, BOOLEAN |
| from utilities.utilities import INCLUDE_ARRAY, ONLY_ARRAY |
| from utilities.utilities import py_list_to_sql_string as PY2SQL |
| from utilities.utilities import strip_end_quotes, split_quoted_delimited_str |
| from utilities.utilities import unique_string |
| from utilities.validate_args import array_col_has_same_dimension |
| from utilities.validate_args import cols_in_tbl_valid |
| from utilities.validate_args import get_col_dimension |
| from utilities.validate_args import get_expr_type |
| from utilities.validate_args import input_tbl_valid |
| from utilities.validate_args import is_var_valid |
| from utilities.validate_args import output_tbl_valid |
| from utilities.validate_args import table_exists |
| from utilities.validate_args import quote_ident |
| from utilities.minibatch_validation import validate_dependent_var_for_minibatch |
| from utilities.utilities import create_cols_from_array_sql_string |
| |
| |
| |
| @MinWarning("error") |
| def mlp(schema_madlib, source_table, output_table, independent_varname, |
| dependent_varname, hidden_layer_sizes, optimizer_param_str, activation, |
| is_classification, weights, warm_start, verbose=False, grouping_col=""): |
| """ |
| Args: |
| @param schema_madlib |
| @param source_table |
| @param output_table |
| @param independent_varname |
| @param dependent_varname |
| @param hidden_layer_sizes |
| @param optimizer_param_str |
| |
| Returns: |
| None |
| """ |
| warm_start = bool(warm_start) |
| optimizer_params = _get_optimizer_params(optimizer_param_str or "") |
| summary_table = add_postfix(output_table, "_summary") |
| standardization_table = add_postfix(output_table, "_standardization") |
| hidden_layer_sizes = hidden_layer_sizes or [] |
| |
| _validate_args(source_table, output_table, summary_table, |
| standardization_table, independent_varname, |
| dependent_varname, hidden_layer_sizes, optimizer_params, |
| warm_start, activation, grouping_col) |
| |
| tolerance = optimizer_params['tolerance'] |
| n_iterations = optimizer_params['n_iterations'] |
| step_size_init = optimizer_params['learning_rate_init'] |
| iterations_per_step = optimizer_params['iterations_per_step'] |
| power = optimizer_params['power'] |
| gamma = optimizer_params['gamma'] |
| step_size = step_size_init |
| n_tries = optimizer_params['n_tries'] |
| # lambda is a reserved word in python |
| lambda_ = optimizer_params['lambda'] |
| batch_size = optimizer_params['batch_size'] |
| n_epochs = optimizer_params['n_epochs'] |
| momentum = optimizer_params['momentum'] |
| is_nesterov = optimizer_params['nesterov'] |
| |
| # Note that we don't support weights with mini batching yet, so validate |
| # this based on is_minibatch_enabled. |
| weights = '1' if not weights or not weights.strip() else weights.strip() |
| |
| is_minibatch_enabled = check_if_minibatch_enabled(source_table, independent_varname) |
| _validate_params_based_on_minibatch(source_table, independent_varname, |
| dependent_varname, weights, |
| is_classification, |
| is_minibatch_enabled) |
| activation = _get_activation_function_name(activation) |
| learning_rate_policy = _get_learning_rate_policy_name( |
| optimizer_params["learning_rate_policy"]) |
| activation_index = _get_activation_index(activation) |
| |
| # The original dependent_varname is required later if warm start is |
| # used, and while creating the model summary table. Keep a copy of it |
| # since dependent_varname is overwritten if one hot encoding is used. |
| dependent_varname_backup = dependent_varname |
| classes = [] |
| |
| if is_minibatch_enabled: |
| mlp_preprocessor = MLPMinibatchPreProcessor(source_table) |
| pp_summary_dict = mlp_preprocessor.preprocessed_summary_dict |
| |
| if (pp_summary_dict[MLPMinibatchPreProcessor.GROUPING_COL]): |
| # if a valid grouping_col is provided then it should be same as the |
| # grouping_col used in preprocessing. |
| _assert(grouping_col == pp_summary_dict[MLPMinibatchPreProcessor.GROUPING_COL], |
| "MLP: Grouping column input should be same as the one used " |
| "in the preprocessor.") |
| |
| batch_size = min(200, pp_summary_dict['buffer_size'])\ |
| if batch_size == 1 else batch_size |
| tbl_data_scaled = source_table |
| col_ind_var_norm_new = MLPMinibatchPreProcessor.INDEPENDENT_VARNAME |
| col_dep_var_norm_new = MLPMinibatchPreProcessor.DEPENDENT_VARNAME |
| x_mean_table = mlp_preprocessor.std_table |
| num_input_nodes = get_col_dimension(source_table, independent_varname, |
| dim=2) |
| if is_classification: |
| if pp_summary_dict["class_values"]: |
| classes = [quote_literal(c) for c in pp_summary_dict["class_values"]] |
| num_output_nodes = len(classes) |
| else: |
| # Assume that the dependent variable is already one-hot-encoded |
| num_output_nodes = get_col_dimension(source_table, |
| dependent_varname, |
| dim=2) |
| else: |
| num_output_nodes = get_col_dimension(source_table, |
| dependent_varname, dim=2) |
| dependent_vartype = pp_summary_dict["dependent_vartype"] |
| else: |
| grouping_col = grouping_col or "" |
| x_mean_table = unique_string(desp='x_mean_table') |
| tbl_data_scaled = unique_string(desp="tbl_data_scaled") |
| col_ind_var_norm_new = unique_string(desp="ind_var_norm") |
| col_dep_var_norm_new = unique_string(desp="dep_var_norm") |
| # Standardize the data, and create a standardized version of the |
| # source_table in tbl_data_scaled. Use this standardized table for IGD. |
| num_input_nodes = get_col_dimension(source_table, independent_varname, |
| dim=1) |
| dimension = num_input_nodes # dimension is used for normalize |
| normalize_data(locals()) |
| dependent_vartype = get_expr_type(dependent_varname, source_table) |
| |
| # We are now using tbl_data_scaled, so change the dependent |
| # varname accordingly. |
| dependent_varname = col_dep_var_norm_new |
| if is_classification: |
| # If dependent variable is an array during classification, assume |
| # that it is already one-hot-encoded. |
| if "[]" in dependent_vartype: |
| num_output_nodes = get_col_dimension(tbl_data_scaled, |
| dependent_varname) |
| else: |
| classes = get_distinct_col_levels( |
| source_table, dependent_varname_backup, dependent_vartype) |
| num_output_nodes = len(classes) |
| dependent_varname = get_one_hot_encoded_expr(dependent_varname, |
| classes) |
| else: |
| if "[]" not in dependent_vartype: |
| dependent_varname = "ARRAY[{0}]".format(col_dep_var_norm_new) |
| num_output_nodes = get_col_dimension(tbl_data_scaled, |
| dependent_varname, dim=1) |
| |
| reserved_cols = ['coeff', 'loss', 'n_iterations'] |
| grouping_str, grouping_col = get_grouping_col_str(schema_madlib, 'MLP', |
| reserved_cols, |
| source_table, |
| grouping_col) |
| # Need layers sizes before validating for warm_start |
| layer_sizes = [num_input_nodes] + hidden_layer_sizes + [num_output_nodes] |
| col_grp_key = unique_string(desp='col_grp_key') |
| if warm_start: |
| coeff = _validate_warm_start(output_table, summary_table, |
| standardization_table, independent_varname, |
| dependent_varname_backup, layer_sizes, |
| optimizer_params, is_classification, |
| weights, warm_start, activation) |
| if grouping_col: |
| # get an independent warm start coefficient for each group |
| grouping_col_list = split_quoted_delimited_str(grouping_col) |
| join_condition = ' AND '.join(['p.{0} = {0}'.format(col) |
| for col in grouping_col_list]) |
| start_coeff = """ |
| SELECT coeff |
| FROM {output_table} as p |
| WHERE {join_condition} AND |
| array_to_string(ARRAY[{grouping_str}], ',') = {col_grp_key} |
| """.format(**locals()) |
| else: |
| start_coeff = PY2SQL(coeff, array_type="DOUBLE PRECISION") |
| else: |
| # if warm start not enabled then initialization is the responsibility of |
| # the model |
| start_coeff = "NULL::DOUBLE PRECISION[]" |
| |
| if grouping_col: |
| group_by_clause = "GROUP BY {0}, {1}".format(grouping_col, col_grp_key) |
| grouping_str_comma = grouping_col + "," |
| using_clause = "USING ({0})".format(col_grp_key) |
| else: |
| group_by_clause, grouping_str_comma, using_clause = "", "", "ON TRUE" |
| |
| # local variables |
| it_args = { |
| "schema_madlib": schema_madlib, |
| "independent_varname": independent_varname, |
| "dependent_varname": dependent_varname, |
| "prev_state": None, |
| "layer_sizes": PY2SQL(layer_sizes, array_type="DOUBLE PRECISION"), |
| "step_size": step_size, |
| "source_table": source_table, |
| "output_table": output_table, |
| "activation": activation_index, |
| "is_classification": int(is_classification), |
| "weights": weights, |
| "warm_start": warm_start, |
| "n_iterations": n_iterations, |
| "tolerance": tolerance, |
| "lambda_": lambda_, |
| "grouping_col": grouping_col, |
| "grouping_str": grouping_str, |
| "x_mean_table": x_mean_table, |
| "batch_size": batch_size, |
| "n_epochs": n_epochs, |
| "start_coeff": start_coeff, |
| "momentum": momentum, |
| "is_nesterov": is_nesterov |
| } |
| # variables to be used by GroupIterationController |
| it_args.update({ |
| 'rel_args': unique_string(desp='rel_args'), |
| 'rel_state': unique_string(desp='rel_state'), |
| 'col_grp_iteration': unique_string(desp='col_grp_iteration'), |
| 'col_grp_state': unique_string(desp='col_grp_state'), |
| 'col_grp_key': col_grp_key, |
| 'col_n_tuples': unique_string(desp='col_n_tuples'), |
| 'state_type': "double precision[]", |
| 'rel_source': tbl_data_scaled, |
| 'col_ind_var': col_ind_var_norm_new, |
| 'col_dep_var': col_dep_var_norm_new, |
| 'state_size': -1 |
| }) |
| # variables used in constructing output tables |
| it_args.update({ |
| 'group_by_clause': group_by_clause, |
| 'using_clause': using_clause, |
| 'grouping_str_comma': grouping_str_comma, |
| }) |
| |
| first_try = True |
| temp_output_table = unique_string(desp='temp_output_table') |
| |
| for _ in range(n_tries): |
| prev_state = None |
| iterationCtrl = GroupIterationController(it_args) |
| with iterationCtrl as it: |
| it.iteration = 0 |
| while True: |
| if learning_rate_policy == "exp": |
| step_size = step_size_init * gamma**it.iteration |
| elif learning_rate_policy == "inv": |
| step_size = step_size_init * (it.iteration+1)**(-power) |
| elif learning_rate_policy == "step": |
| step_size = step_size_init * gamma**( |
| math.floor(it.iteration / iterations_per_step)) |
| it.kwargs['step_size'] = step_size |
| if is_minibatch_enabled: |
| train_sql = """ |
| {schema_madlib}.mlp_minibatch_step( |
| ({independent_varname})::DOUBLE PRECISION[], |
| ({dependent_varname})::DOUBLE PRECISION[], |
| {rel_state}.{col_grp_state}, |
| {layer_sizes}, |
| ({step_size})::FLOAT8, |
| {activation}, |
| {is_classification}, |
| ({weights})::DOUBLE PRECISION, |
| ({start_coeff})::DOUBLE PRECISION[], |
| {lambda_}, |
| {batch_size}::integer, |
| {n_epochs}::integer, |
| {momentum}::FLOAT8, |
| {is_nesterov}::boolean |
| ) |
| """ |
| else: |
| train_sql = """ |
| {schema_madlib}.mlp_igd_step( |
| ({col_ind_var})::DOUBLE PRECISION[], |
| ({dependent_varname})::DOUBLE PRECISION[], |
| {rel_state}.{col_grp_state}, |
| {layer_sizes}, |
| ({step_size})::FLOAT8, |
| {activation}, |
| {is_classification}, |
| ({weights})::DOUBLE PRECISION, |
| ({start_coeff})::DOUBLE PRECISION[], |
| {lambda_}, |
| {momentum}::FLOAT8, |
| {is_nesterov}::boolean |
| ) |
| """ |
| it.update(train_sql) |
| if it.kwargs['state_size'] == -1: |
| it.kwargs['state_size'] = it.get_state_size() |
| |
| |
| if it.test(""" |
| {iteration} >= {n_iterations} |
| OR |
| abs(_state_previous[{state_size}] - |
| _state_current[{state_size}]) < {tolerance} |
| """): |
| break |
| if verbose and it.iteration < n_iterations: |
| # Get loss value from the state. |
| res = it.get_param_value_per_group( |
| "_state_current[array_upper(_state_current, 1)] AS loss") |
| # Create a list of grouping values if grouping_cols was |
| # used, it will be an empty list if there was no grouping. |
| groups = [t[col_grp_key] for t in res if t[col_grp_key]] |
| losses = [t['loss'] for t in res] |
| loss = zip(groups, losses) if groups else losses |
| plpy.info("Iteration: {0}, Loss: <{1}>". |
| format(it.iteration, ', '.join(map(str, loss)))) |
| it.final() |
| _update_temp_model_table(it_args, it.iteration, temp_output_table, |
| is_minibatch_enabled, first_try) |
| first_try = False |
| layer_sizes_str = PY2SQL(layer_sizes, array_type="integer") |
| |
| _create_summary_table(locals()) |
| if is_minibatch_enabled: |
| # We already have the mean and std in the input standardization table |
| input_std_table = add_postfix(source_table, '_standardization') |
| _create_standardization_table(standardization_table, input_std_table, |
| warm_start) |
| # The original input table is the tab_data_scaled for mini batch. |
| # Do NOT drop tbl_data_scaled and x_mean_table with minibatch, |
| # it will end up dropping the original data table. |
| else: |
| _create_standardization_table(standardization_table, x_mean_table, |
| warm_start) |
| # Drop the following tables only for IGD. |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(tbl_data_scaled)) |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(x_mean_table)) |
| |
| _create_output_table(output_table, temp_output_table, grouping_col, |
| warm_start) |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_output_table)) |
| return None |
| |
| def normalize_data(args): |
| """ |
| Create a new temp table (tbl_data_scaled) with the standardized version |
| of the independent variable column (independent_varname) in the input |
| data table (source_table). |
| """ |
| # We don't have to standardize the dependent variable. |
| y_decenter = False |
| # For MLP we need to change std. dev value of 0 to 1. Set the following |
| # flag to True to invoke the corresponding utils_regularization method. |
| set_zero_std_to_one = True |
| if args["grouping_col"]: |
| # When grouping_col is defined, we must find an array containing |
| # the mean and std of every dimension in the independent variable (x) |
| # specific to groups. Store these results in temp tables x_mean_table |
| # __utils_normalize_data_grouping reads the various means and stds |
| # from the tables. |
| utils_ind_var_scales_grouping(args["source_table"], |
| args["independent_varname"], |
| args["dimension"], args["schema_madlib"], |
| args["grouping_col"], |
| args["x_mean_table"], |
| set_zero_std_to_one) |
| __utils_normalize_data_grouping(y_decenter, |
| tbl_data=args["source_table"], |
| col_ind_var=args["independent_varname"], |
| col_dep_var=args["dependent_varname"], |
| tbl_data_scaled=args["tbl_data_scaled"], |
| col_ind_var_norm_new=args["col_ind_var_norm_new"], |
| col_dep_var_norm_new=args["col_dep_var_norm_new"], |
| schema_madlib=args["schema_madlib"], |
| x_mean_table=args["x_mean_table"], |
| y_mean_table='', |
| grouping_col=args["grouping_col"]) |
| else: |
| # When no grouping_col is defined, the mean and std for 'x' |
| # can be defined using strings, stored in x_mean_str, x_std_str. |
| # We don't need a table like how we needed for grouping. |
| x_scaled_vals = utils_ind_var_scales(args["source_table"], |
| args["independent_varname"], |
| args["dimension"], |
| args["schema_madlib"], |
| args["x_mean_table"], |
| set_zero_std_to_one) |
| x_mean_str = _array_to_string(x_scaled_vals["mean"]) |
| x_std_str = _array_to_string(x_scaled_vals["std"]) |
| __utils_normalize_data(y_decenter, |
| tbl_data=args["source_table"], |
| col_ind_var=args["independent_varname"], |
| col_dep_var=args["dependent_varname"], |
| tbl_data_scaled=args["tbl_data_scaled"], |
| col_ind_var_norm_new=args["col_ind_var_norm_new"], |
| col_dep_var_norm_new=args["col_dep_var_norm_new"], |
| schema_madlib=args["schema_madlib"], |
| x_mean_str=x_mean_str, |
| x_std_str=x_std_str, |
| y_mean='', |
| y_std='', |
| grouping_col=args["grouping_col"]) |
| |
| return None |
| # ------------------------------------------------------------------------ |
| |
| |
| def _create_standardization_table(standardization_table, x_mean_table, warm_start): |
| if warm_start: |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(standardization_table)) |
| standarization_table_creation_query = """ |
| CREATE TABLE {standardization_table} AS ( |
| SELECT * FROM {x_mean_table} |
| ) |
| """.format(**locals()) |
| plpy.execute(standarization_table_creation_query) |
| |
| |
| def _create_summary_table(args): |
| grouping_text = "NULL" if not args['grouping_col'] else args['grouping_col'] |
| args.update(locals()) |
| if args['warm_start']: |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(args['summary_table'])) |
| |
| classes_type = args['dependent_vartype'] |
| minibatch_summary_col_names = '' |
| minibatch_summary_col_vals = '' |
| dependent_vartype_colname = 'dependent_vartype' |
| if args['is_minibatch_enabled']: |
| # Add a few more columns in the summary table |
| minibatch_summary_col_names = """ |
| original_source_table TEXT, |
| original_independent_varname TEXT, |
| original_dependent_varname TEXT, |
| batch_size INTEGER, |
| n_epochs INTEGER, |
| """ |
| dependent_vartype_colname = 'original_dependent_vartype' |
| mlp_pre_dict = args['pp_summary_dict'] |
| source_table = mlp_pre_dict['source_table'] |
| independent_varname = mlp_pre_dict['independent_varname'] |
| dependent_varname = mlp_pre_dict['dependent_varname'] |
| # This variable is used for creating the classes_str column in the model |
| # summary table. We append [] when we create this column in the create |
| # summary table command so we need to strip it out here. |
| if classes_type[-2:] == '[]': |
| classes_type = classes_type[:-2] |
| batch_size = args['batch_size'] |
| n_epochs = args['n_epochs'] |
| minibatch_summary_col_vals = """ |
| $__madlib__${source_table}$__madlib__$::TEXT, |
| $__madlib__${independent_varname}$__madlib__$::TEXT, |
| $__madlib__${dependent_varname}$__madlib__$::TEXT, |
| {batch_size}, |
| {n_epochs}, |
| """.format(**locals()) |
| |
| summary_table_creation_query = """ |
| CREATE TABLE {summary_table}( |
| source_table TEXT, |
| independent_varname TEXT, |
| dependent_varname TEXT, |
| {minibatch_summary_col_names} |
| {dependent_vartype_colname} TEXT, |
| tolerance FLOAT, |
| learning_rate_init FLOAT, |
| learning_rate_policy TEXT, |
| momentum FLOAT, |
| nesterov BOOLEAN, |
| n_iterations INTEGER, |
| n_tries INTEGER, |
| layer_sizes INTEGER[], |
| activation TEXT, |
| is_classification BOOLEAN, |
| classes {classes_type}[], |
| weights VARCHAR, |
| grouping_col VARCHAR |
| )""".format(minibatch_summary_col_names=minibatch_summary_col_names, |
| dependent_vartype_colname=dependent_vartype_colname, |
| classes_type=classes_type, |
| **args) |
| summary_table_update_query = """ |
| INSERT INTO {summary_table} VALUES( |
| '{source_table}', |
| $__madlib__${independent_varname}$__madlib__$::TEXT, |
| $__madlib__${dependent_varname_backup}$__madlib__$::TEXT, |
| {minibatch_summary_col_vals} |
| '{dependent_vartype}', |
| {tolerance}, |
| {step_size_init}, |
| '{learning_rate_policy}', |
| {momentum}, |
| {is_nesterov}, |
| {n_iterations}, |
| {n_tries}, |
| {layer_sizes_str}, |
| '{activation}', |
| {is_classification}, |
| {classes_str}, |
| '{weights}', |
| '{grouping_text}' |
| )""".format(classes_str=PY2SQL(args['classes'], array_type=classes_type, |
| long_format=True), |
| minibatch_summary_col_vals=minibatch_summary_col_vals, |
| **args) |
| plpy.execute(summary_table_creation_query) |
| plpy.execute(summary_table_update_query) |
| |
| |
| def _create_output_table(output_table, temp_output_table, |
| grouping_col, warm_start): |
| grouping_col_comma = '' |
| partition_by = '' |
| if grouping_col: |
| grouping_col_comma = grouping_col + "," |
| partition_by = " PARTITION BY {0} ".format(grouping_col) |
| if warm_start: |
| plpy.execute("DROP TABLE IF EXISTS {0}".format(output_table)) |
| build_output_query = """ |
| CREATE TABLE {output_table} AS |
| SELECT {grouping_col_comma} coeff, loss, num_iterations |
| FROM ( |
| SELECT {temp_output_table}.*, |
| row_number() OVER ({partition_by} ORDER BY loss) AS rank |
| FROM {temp_output_table} |
| ) x |
| WHERE x.rank = 1; |
| """.format(**locals()) |
| plpy.execute(build_output_query) |
| |
| |
| def _update_temp_model_table(args, iteration, temp_output_table, |
| is_minibatch_enabled, first_try): |
| insert_or_create_str = "INSERT INTO {0}" |
| if first_try: |
| insert_or_create_str = "CREATE TEMP TABLE {0} as" |
| insert_or_create_str = insert_or_create_str.format(temp_output_table) |
| join_clause = '' |
| if args['grouping_col']: |
| join_clause = """ |
| JOIN |
| ( |
| SELECT |
| {grouping_str_comma} |
| array_to_string(ARRAY[{grouping_str}], |
| ',' |
| ) AS {col_grp_key} |
| FROM {source_table} |
| {group_by_clause} |
| ) grouping_q |
| {using_clause} |
| """.format(**args) |
| if is_minibatch_enabled: |
| internal_result_udf = "internal_mlp_minibatch_result" |
| else: |
| internal_result_udf = "internal_mlp_igd_result" |
| model_table_query = """ |
| {insert_or_create_str} |
| SELECT |
| {grouping_str_comma} |
| (result).coeff as coeff, |
| (result).loss as loss, |
| {iteration} as num_iterations |
| FROM ( |
| SELECT |
| {schema_madlib}.{internal_result_udf}( |
| {col_grp_state} |
| ) AS result, |
| {col_grp_key} |
| FROM {rel_state} |
| WHERE {col_grp_iteration} = {iteration} |
| ) rel_state_subq |
| {join_clause} |
| """.format(insert_or_create_str=insert_or_create_str, |
| iteration=iteration, join_clause=join_clause, |
| internal_result_udf=internal_result_udf, **args) |
| plpy.execute(model_table_query) |
| |
| def _get_optimizer_params(param_str): |
| params_defaults = { |
| "learning_rate_init": (0.001, float), |
| "n_iterations": (100, int), |
| "n_tries": (1, int), |
| "tolerance": (0.001, float), |
| "learning_rate_policy": ("constant", str), |
| "gamma": (0.1, float), |
| "iterations_per_step": (100, int), |
| "power": (0.5, float), |
| "lambda": (0, float), |
| "n_epochs": (1, int), |
| "batch_size": (1, int), |
| "momentum": (0.9, float), |
| "nesterov": (True, bool) |
| } |
| param_defaults = dict([(k, v[0]) for k, v in params_defaults.items()]) |
| param_types = dict([(k, v[1]) for k, v in params_defaults.items()]) |
| |
| if not param_str: |
| return param_defaults |
| |
| name_value = extract_keyvalue_params( |
| param_str, param_types, param_defaults, ignore_invalid=False) |
| return name_value |
| |
| def _validate_standardization_table(standardization_table, glist=[]): |
| input_tbl_valid(standardization_table, 'MLP') |
| cols_in_tbl_valid(standardization_table, glist + ['mean', 'std'], 'MLP') |
| |
| def _validate_summary_table(summary_table): |
| input_tbl_valid(summary_table, 'MLP') |
| cols_in_tbl_valid(summary_table, [ |
| 'dependent_varname', 'independent_varname', 'activation', |
| 'tolerance', 'learning_rate_init', 'n_iterations', 'n_tries', |
| 'classes', 'layer_sizes', 'source_table' |
| ], 'MLP') |
| |
| def _validate_warm_start(output_table, summary_table, standardization_table, |
| independent_varname, dependent_varname, layer_sizes, |
| optimizer_params, is_classification, weights, |
| warm_start, activation): |
| _assert(table_exists(output_table), |
| "MLP error: Warm start failed due to missing model table: " + output_table) |
| _assert(table_exists(summary_table), |
| "MLP error: Warm start failed due to missing summary table: " + summary_table) |
| _assert(table_exists(standardization_table), |
| "MLP error: Warm start failed due to missing standardization table: " + standardization_table) |
| |
| _assert(optimizer_params["n_tries"] == 1, |
| "MLP error: warm_start is only compatible for n_tries = 1") |
| |
| summary = plpy.execute("SELECT * FROM {0}".format(summary_table))[0] |
| params = [ |
| "independent_varname", "dependent_varname", "layer_sizes", |
| "is_classification", "weights", "activation" |
| ] |
| for param in params: |
| _assert_equal(eval(param), summary[param], |
| "MLP error: warm start failed due to different parameter value: " + |
| param) |
| output = plpy.execute("SELECT * FROM {0}".format(output_table)) |
| num_coeffs = sum( |
| map(lambda i: (layer_sizes[i] + 1) * (layer_sizes[i + 1]), |
| range(len(layer_sizes) - 1))) |
| for row in output: |
| coeff = row['coeff'] |
| _assert_equal(num_coeffs, |
| len(coeff), |
| "MLP error: Warm start failed to invalid output_table: " + |
| output_table + ". Invalid number of coefficients in model.") |
| return coeff |
| |
| def _validate_dependent_var(source_table, dependent_varname, |
| is_classification, is_minibatch_enabled): |
| expr_type = get_expr_type(dependent_varname, source_table) |
| classification_types = INTEGER | BOOLEAN | TEXT |
| |
| if is_minibatch_enabled: |
| if is_classification: |
| validate_dependent_var_for_minibatch( |
| source_table, dependent_varname, expr_type) |
| else: |
| if is_classification: |
| _assert((is_valid_psql_type(expr_type, NUMERIC | ONLY_ARRAY) |
| and not _get_dep_var_second_dim(dependent_varname, source_table) |
| ) |
| or is_valid_psql_type(expr_type, classification_types), |
| "Dependent variable column should either be a numeric 1-D" |
| " array, or be of type: {0}".format(','.join(classification_types))) |
| else: |
| _assert(is_valid_psql_type(expr_type, NUMERIC | INCLUDE_ARRAY), |
| "Dependent variable column should be of numeric type.") |
| |
| def _get_dep_var_second_dim(dependent_varname, source_table): |
| # Check if dependent variable is an array of two or higher dimension |
| # Return back the value of the second dimension, returns None if it less |
| # than 2-D. |
| dep_array_sec_dim = plpy.execute(""" |
| SELECT array_upper({0}, 2) AS n_y |
| FROM {1} |
| LIMIT 1 |
| """.format(dependent_varname, source_table)) |
| return dep_array_sec_dim[0]['n_y'] |
| |
| def _validate_params_based_on_minibatch(source_table, independent_varname, |
| dependent_varname, weights, |
| is_classification, |
| is_minibatch_enabled): |
| """ |
| Some params have to be validated after knowing if the solver is |
| minibatch or not. |
| """ |
| if is_minibatch_enabled: |
| _assert(weights == '1', |
| "MLP Error: The input weights param is not supported with" |
| " mini-batch version of MLP.") |
| else: |
| int_types = ['integer', 'smallint', 'bigint'] |
| float_types = ['double precision', 'real'] |
| _assert(get_expr_type(weights, source_table) in int_types + float_types, |
| "MLP error: Weights should be a numeric type") |
| _assert(array_col_has_same_dimension(source_table, independent_varname), |
| "Independent variable column should refer to arrays of the same length") |
| |
| _validate_dependent_var(source_table, dependent_varname, |
| is_classification, is_minibatch_enabled) |
| |
| def _validate_args(source_table, output_table, summary_table, |
| standardization_table, independent_varname, |
| dependent_varname, hidden_layer_sizes, optimizer_params, |
| warm_start, activation, grouping_col): |
| |
| input_tbl_valid(source_table, "MLP") |
| if not warm_start: |
| output_tbl_valid(output_table, "MLP") |
| output_tbl_valid(summary_table, "MLP") |
| output_tbl_valid(standardization_table, "MLP") |
| |
| _assert(is_var_valid(source_table, independent_varname), |
| "MLP error: invalid independent_varname " |
| "('{independent_varname}') for source_table " |
| "({source_table})!".format( |
| independent_varname=independent_varname, |
| source_table=source_table)) |
| # independent variable should either be a 1D or 2D(minibatch) array |
| _assert("[]" in get_expr_type(independent_varname, source_table), |
| "Independent variable column should refer to an array") |
| |
| _assert(is_var_valid(source_table, dependent_varname), |
| "MLP error: invalid dependent_varname " |
| "('{dependent_varname}') for source_table " |
| "({source_table})!".format( |
| dependent_varname=dependent_varname, source_table=source_table)) |
| |
| _assert(isinstance(hidden_layer_sizes, list), |
| "hidden_layer_sizes must be an array of integers") |
| _assert(all(isinstance(value, int) for value in hidden_layer_sizes), |
| "MLP error: Hidden layers sizes must be integers") |
| _assert(all(value >= 0 for value in hidden_layer_sizes), |
| "MLP error: Hidden layers sizes must be greater than 0.") |
| _assert(optimizer_params["lambda"] >= 0, |
| "MLP error: lambda should be greater than or equal to 0.") |
| _assert(optimizer_params["tolerance"] >= 0, |
| "MLP error: tolerance should be greater than or equal to 0.") |
| _assert(optimizer_params["n_tries"] >= 1, |
| "MLP error: n_tries should be greater than or equal to 1") |
| _assert(optimizer_params["n_iterations"] >= 1, |
| "MLP error: n_iterations should be greater than or equal to 1") |
| _assert(optimizer_params["power"] > 0, |
| "MLP error: power should be greater than 0.") |
| _assert(0 < optimizer_params["gamma"] <= 1, |
| "MLP error: gamma should be between 0 and 1.") |
| _assert(optimizer_params["iterations_per_step"] > 0, |
| "MLP error: iterations_per_step should be greater than 0.") |
| _assert(optimizer_params["learning_rate_init"] > 0, |
| "MLP error: learning_rate_init should be greater than 0.") |
| _assert(optimizer_params["batch_size"] > 0, |
| "MLP error: batch_size should be greater than 0.") |
| _assert(optimizer_params["n_epochs"] > 0, |
| "MLP error: n_epochs should be greater than 0.") |
| _assert(0 <= optimizer_params["momentum"] <= 1, |
| "MLP error: momentum should be in the range [0, 1].") |
| |
| if grouping_col: |
| cols_in_tbl_valid(source_table, |
| _string_to_array_with_quotes(grouping_col), |
| 'MLP', |
| invalid_names=[independent_varname, dependent_varname]) |
| |
| |
| def _get_learning_rate_policy_name(learning_rate_policy): |
| if not learning_rate_policy: |
| learning_rate_policy = 'constant' |
| else: |
| supported_learning_rate_policies = ['constant', 'exp', 'inv', 'step'] |
| try: |
| learning_rate_policy = next( |
| x for x in supported_learning_rate_policies |
| if x.startswith(learning_rate_policy)) |
| except StopIteration: |
| plpy.error( |
| "MLP Error: Invalid learning rate policy: " |
| "{0}. Supported learning rate policies are ({1}).". |
| format(learning_rate_policy, |
| ', '.join(supported_learning_rate_policies))) |
| return learning_rate_policy |
| |
| |
| def _get_activation_function_name(activation): |
| if not activation: |
| activation = 'sigmoid' |
| else: |
| supported_activation_function = ['relu', 'sigmoid', 'tanh'] |
| try: |
| activation = next( |
| x for x in supported_activation_function |
| if x.startswith(activation)) |
| except StopIteration: |
| plpy.error("MLP Error: Invalid activation function: " |
| "{0}. Supported activation functions are ({1}).". |
| format(activation, |
| ', '.join(supported_activation_function))) |
| return activation |
| |
| |
| def _get_activation_index(activation_name): |
| table = {"relu": 0, "sigmoid": 1, "tanh": 2} |
| return table[activation_name] |
| |
| |
| def _format_label(label): |
| if isinstance(label, str): |
| return "'{0}'".format(label) |
| return label |
| |
| def _get_minibatch_param_from_mlp_model_summary(summary_dict, param, |
| minibatch_param): |
| """ |
| Return the value of specific columns from the model summary table. |
| This is to be used only for these params: |
| source_table |
| independent_varname |
| dependent_varname |
| dependent_vartype |
| If the model was trained with minibatch, there would be four new |
| columns introduced, that correspond to the above columns: |
| original_source_table |
| original_independent_varname |
| original_dependent_varname |
| original_dependent_vartype |
| This is because, when minibatch is used, the column names without |
| prefix 'original_' will have the values from the minibatch preprocessed |
| input table, and the column names with the prefix correspond to the |
| original table that was input to the minibatch preprocessing step. |
| |
| Only dependent_vartype or original_dependent_vartype can exist. |
| This is used by predict regression to know if the dependent column |
| is array or not. |
| """ |
| if minibatch_param in summary_dict: |
| return summary_dict[minibatch_param] |
| elif param in summary_dict: |
| return summary_dict[param] |
| |
| return None |
| |
| |
| @MinWarning("error") |
| def mlp_predict(schema_madlib, model_table, data_table, id_col_name, |
| output_table, pred_type='response', **kwargs): |
| """ Score new observations using a trained neural network |
| |
| @param schema_madlib Name of the schema where MADlib is installed |
| @param model_table Name of learned model |
| @param data_table Name of table/view containing the data |
| points to be scored |
| @param id_col_name Name of column in source_table containing |
| (integer) identifier for data point |
| @param output_table Name of table to store the results |
| @param pred_type: str, The type of output required: |
| 'response' gives the actual response values, |
| 'prob' gives the probability of the classes in a |
| For regression, only type='response' is defined. |
| """ |
| input_tbl_valid(model_table, 'MLP') |
| cols_in_tbl_valid(model_table, ['coeff'], 'MLP') |
| summary_table = add_postfix(model_table, "_summary") |
| standardization_table = add_postfix(model_table, "_standardization") |
| _validate_summary_table(summary_table) |
| |
| summary = plpy.execute("SELECT * FROM {0}".format(summary_table))[0] |
| coeff = PY2SQL(plpy.execute( |
| "SELECT * FROM {0}".format(model_table))[0]["coeff"]) |
| dependent_varname = _get_minibatch_param_from_mlp_model_summary(summary, |
| 'dependent_varname', 'original_dependent_varname') |
| independent_varname = _get_minibatch_param_from_mlp_model_summary(summary, |
| 'independent_varname', 'original_independent_varname') |
| source_table = _get_minibatch_param_from_mlp_model_summary(summary, |
| 'source_table', 'original_source_table') |
| activation = _get_activation_index(summary['activation']) |
| layer_sizes = PY2SQL( |
| summary['layer_sizes'], array_type="DOUBLE PRECISION") |
| is_response = int(pred_type == 'response') |
| is_classification = int(summary["is_classification"]) |
| classes = summary['classes'] |
| # Set a flag to indicate that it is a classification model, with an array |
| # as the dependent var. The only scenario where classification allows for |
| # an array dep var is when the user has provided a one-hot encoded dep var |
| # during training, and mlp_classification does not one-hot encode |
| # (and hence classes column in model's summary table is NULL). |
| is_dep_var_an_array_for_classification = int(is_classification and not classes) |
| |
| # Fix to ensure that 1.12 models run on 1.13 or higher. |
| # As a result of adding grouping support in 1.13, some changes were |
| # made wrt standardization. |
| # The x_mean and x_std values were stored in the summary table itself in |
| # MADlib 1.12, and they were named as "x_means" and "x_stds". |
| # From MADlib 1.13 onwards, these parameters were moved to the |
| # _standardization table, and were renamed to "mean" and "std". |
| if 'grouping_col' in summary: |
| # This model was created in MADlib 1.13 or greater version |
| is_pre_113_model = False |
| grouping_col = '' if summary['grouping_col']=='NULL' \ |
| else summary['grouping_col'] |
| else: |
| # This model was created in MADlib 1.12. Grouping was not |
| # supported in 1.12, but was added later in 1.13. |
| is_pre_113_model = True |
| grouping_col = '' |
| # Validate the summary table created with the 1.12 MLP model table. |
| cols_in_tbl_valid(summary_table, ['x_means', 'x_stds'], 'MLP') |
| |
| pred_name = (('"prob_{0}"' if pred_type == "prob" else '"estimated_{0}"'). |
| format(dependent_varname.replace('"', '').strip())) |
| |
| input_tbl_valid(data_table, 'MLP') |
| |
| _assert( |
| is_var_valid(data_table, independent_varname), |
| "MLP Error: independent_varname ('{0}') is invalid for data_table ({1})". |
| format(independent_varname, data_table)) |
| _assert(id_col_name is not None, "MLP Error: id_col_name is NULL") |
| _assert( |
| is_var_valid(data_table, id_col_name), |
| "MLP Error: id_col_name ('{0}') is invalid for {1}".format( |
| id_col_name, data_table)) |
| output_tbl_valid(output_table, 'MLP') |
| |
| header = "CREATE TABLE " + output_table + " AS " |
| select_grouping_col = "" |
| group_by_predict_str = "" |
| grouping_col_comma = "" |
| join_str = '' |
| grouping_col_list = split_quoted_delimited_str(grouping_col) |
| if not is_pre_113_model: |
| _validate_standardization_table(standardization_table, grouping_col_list) |
| if grouping_col: |
| join_str = """JOIN {model_table} |
| USING ({grouping_col}) |
| JOIN {standardization_table} |
| USING ({grouping_col}) |
| """.format(**locals()) |
| group_by = ', '.join(['{0}.{1}'.format(data_table, col) |
| for col in grouping_col_list]) |
| group_by_predict_str = ("ORDER BY {0}, {1}.{2}". |
| format(group_by, data_table, id_col_name)) |
| select_grouping_col = ','.join(['q.{0}'.format(col) |
| for col in grouping_col_list]) + ',' |
| grouping_col_comma = grouping_col+"," |
| |
| coeff_column = "{model_table}.coeff::DOUBLE PRECISION[]".format(**locals()) |
| mean_col = "{standardization_table}.mean".format(**locals()) |
| std_col = "{standardization_table}.std".format(**locals()) |
| else: |
| # if not grouping, then directly read out the coeff, mean |
| # and std values from the model and standardization tables. |
| if is_pre_113_model: |
| # Get mean and std from the summary table |
| standardization = plpy.execute(""" |
| SELECT x_means AS mean, x_stds AS std |
| FROM {0} |
| """.format(summary_table))[0] |
| else: |
| # Get mean and std from the standardization table |
| standardization = plpy.execute(""" |
| SELECT mean, std |
| FROM {0} |
| """.format(standardization_table))[0] |
| coeff = PY2SQL(plpy.execute( |
| "SELECT coeff FROM {0}".format(model_table))[0]["coeff"]) |
| x_means = PY2SQL(standardization['mean'], array_type="DOUBLE PRECISION") |
| x_stds = PY2SQL(standardization['std'], array_type="DOUBLE PRECISION") |
| |
| coeff_column = "{coeff}".format(**locals()) |
| mean_col = "{x_means}".format(**locals()) |
| std_col = "{x_stds}".format(**locals()) |
| |
| predict_uda_query = """{schema_madlib}.internal_predict_mlp( |
| {coeff_column}, |
| {independent_varname}::DOUBLE PRECISION[], |
| {is_classification}, |
| {activation}, |
| {layer_sizes}, |
| {is_response}, |
| {mean_col}, |
| {std_col}, |
| {is_dep_var_an_array_for_classification} |
| ) |
| """.format(**locals()) |
| if is_classification: |
| if pred_type == "response": |
| if classes: |
| prediction_select_clause = "(ARRAY{0})[pred_idx[1]+1] AS {1}".format(classes, pred_name) |
| else: |
| # Case when the training step did not have to one-hot encode |
| # the dependent var. |
| prediction_select_clause = "pred_idx AS {0}".format(pred_name) |
| sql = header + """ |
| SELECT {select_grouping_col} |
| q.{id_col_name}, |
| {prediction_select_clause} |
| FROM ( |
| SELECT {grouping_col_comma} |
| {id_col_name}, |
| {predict_uda_query} AS pred_idx |
| FROM {data_table} |
| {join_str} |
| {group_by_predict_str} |
| ) q |
| """ |
| else: |
| intermediate_col = unique_string() |
| if classes: |
| score_format, _ = create_cols_from_array_sql_string( |
| classes, intermediate_col, 'prob', 'double precision', False, 'MLP') |
| else: |
| # Case when the training step did not have to one-hot encode |
| # the dependent var. |
| score_format = '{0} AS estimated_prob'.format(intermediate_col) |
| sql = header + """ |
| SELECT {select_grouping_col} |
| {id_col_name}, |
| {score_format} |
| FROM ( |
| SELECT {grouping_col_comma} |
| {id_col_name}, |
| {predict_uda_query}::TEXT[] AS {intermediate_col} |
| FROM {data_table} |
| {join_str} |
| {group_by_predict_str} |
| ) q |
| """ |
| else: |
| # Regression |
| dependent_type = _get_minibatch_param_from_mlp_model_summary(summary, |
| 'dependent_vartype', 'original_dependent_vartype') |
| unnest_if_not_array = "" |
| # Return the same type as the user provided. Internally we always |
| # use an array, but if they provided a scalar, unnest it for |
| # the user |
| # If the dependent_type is None, it means that the model was trained |
| # before this column was added (< 1.14). In this case, always unnest the |
| # output. Note that this will return a array of len 1 even if the input is |
| # scalar |
| if dependent_type and "[]" not in dependent_type: |
| unnest_if_not_array = "UNNEST" |
| sql = header + """ |
| SELECT {grouping_col_comma} |
| {id_col_name}, |
| {unnest_if_not_array}({predict_uda_query}) AS {pred_name} |
| FROM {data_table} |
| {join_str} |
| {group_by_predict_str} |
| """ |
| sql = sql.format(**locals()) |
| plpy.execute(sql) |
| |
| |
| def mlp_help(schema_madlib, message, is_classification): |
| method = 'mlp_classification' if is_classification else 'mlp_regression' |
| int_types = ['integer', 'smallint', 'bigint'] |
| text_types = ['text', 'varchar', 'character varying', 'char', 'character'] |
| boolean_types = ['boolean'] |
| supported_types = " " * 33 + ", ".join(text_types) + "\n" +\ |
| " " * 33 + ", ".join(int_types + boolean_types) |
| label_description_classification = "Name of a column which specifies label.\n" +\ |
| " " * 33 + "Supported types are:\n" + supported_types |
| label_description_regression = ( |
| "Dependent variable. May be an array for \n" + " " * 33 + |
| "multiple regression or the name of a column which is any\n" + " " * 33 + |
| "numeric type for single regression") |
| label_description = label_description_classification if is_classification\ |
| else label_description_regression |
| args = dict(schema_madlib=schema_madlib, method=method, |
| label_description=label_description) |
| |
| summary = """ |
| ---------------------------------------------------------------- |
| SUMMARY |
| ---------------------------------------------------------------- |
| Multilayer Perceptron (MLP) is a model for regression and |
| classification |
| |
| Also called "vanilla neural networks", they consist of several |
| fully connected hidden layers with non-linear activation |
| functions. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.{method}('usage')""".format(**args) |
| |
| usage = """ |
| --------------------------------------------------------------------------- |
| USAGE |
| --------------------------------------------------------------------------- |
| SELECT {schema_madlib}.{method}( |
| source_table, -- TEXT. name of input table |
| output_table, -- TEXT. name of output model table |
| independent_varname, -- TEXT. name of independent variable |
| dependent_varname, -- TEXT. {label_description} |
| hidden_layer_sizes, -- INTEGER[]. Array of integers indicating the |
| number of hidden units per layer. |
| Length equal to the number of hidden layers. |
| optimizer_params, -- TEXT. optional, default NULL |
| parameters for optimization in |
| a comma-separated string of key-value pairs. |
| To find out more: |
| SELECT {schema_madlib}.{method}('optimizer_params') |
| |
| activation -- TEXT. optional, default: 'sigmoid'. |
| supported activations: 'relu', 'sigmoid', |
| and 'tanh' |
| |
| weights -- TEXT. optional, default: NULL. |
| Weights for input rows. Column name which |
| specifies the weight for each input row. |
| This weight will be incorporated into the |
| update during SGD, and will not be used |
| for loss calculations. If not specified, |
| weight for each row will default to 1. |
| Column should be a numeric type. |
| |
| warm_start -- BOOLEAN. optional, default: FALSE. |
| Initalize weights with the coefficients from |
| the last call. If true, weights will |
| be initialized from output_table. Note that |
| all parameters other than optimizer_params, |
| and verbose must remain constant between calls |
| to warm_start. |
| |
| verbose -- BOOLEAN. optional, default: FALSE |
| Provides verbose output of the results of |
| training. |
| |
| grouping_col -- TEXT. optional, default: NULL |
| A single column or a list of comma-separated |
| columns that divides the input data into discrete |
| groups, resulting in one model per group. When |
| this value is NULL, no grouping is used and a |
| single model is generated for all data. |
| ); |
| |
| |
| --------------------------------------------------------------------------- |
| OUTPUT |
| --------------------------------------------------------------------------- |
| The model table produced by MLP contains the following columns: |
| |
| coeffs -- Flat array containing the weights of the neural net |
| loss -- The total loss over the training data. Cross entropy |
| -- for classification and MSE for regression |
| num_iterations -- The total number of training iterations |
| |
| --------------------------------------------------------------------------- |
| The algorithm also creates a summary table named <output_table>_summary |
| that has the following columns: |
| |
| source_table -- The source table. |
| independent_varname -- The independent variables. |
| dependent_varname -- The dependent variable. |
| tolerance -- The tolerance as given in optimizer_params. |
| learning_rate_init -- The initial learning rate as given in optimizer_params. |
| learning_rate_policy -- The learning rate policy as given in optimizer_params. |
| momentum -- Momentum value as given in optimizer_params. |
| nesterov -- Nesterov value as given in optimizer_params. |
| n_iterations -- The number of iterations run. |
| n_tries -- The number of tries as given in optimizer_params. |
| layer_sizes -- The number of units in each layer including the input |
| -- and output layer. |
| activation -- The activation function. |
| is_classification -- True if the model was trained for classification, False |
| -- if it was trained for regression. |
| classes -- The classes which were trained against (empty for |
| -- regression). |
| weights -- The weight column used during training. |
| grouping_col -- NULL if no grouping_col was specified during training, |
| -- and a comma separated list of grouping column names if not. |
| |
| --------------------------------------------------------------------------- |
| The algorithm also creates a standardization table that stores some meta data |
| used during prediction, and is named <output_table>_standardization. It has |
| the following columns: |
| |
| x_means -- The mean for all input features (used for normalization). |
| x_stds -- The standard deviation for all input features (used for |
| -- normalization). |
| grouping columns -- If grouping_col is specified during training, a column for |
| -- each grouping column is created. |
| |
| """.format(**args) |
| |
| optimizer_params = """ |
| ------------------------------------------------------------------------------------------------ |
| OPTIMIZER PARAMS |
| ------------------------------------------------------------------------------------------------ |
| learning_rate_init DOUBLE PRECISION, -- Default: 0.001 |
| Initial learning rate |
| learning_rate_policy VARCHAR, -- Default: 'constant' |
| One of 'constant','exp','inv','step' |
| 'constant': learning_rate = |
| learning_rate_init |
| 'exp': learning_rate = |
| learning_rate_init * gamma^(iter) |
| 'inv': learning_rate = |
| learning_rate_init * (iter+1)^(-power) |
| 'step': learning_rate = |
| learning_rate_init * gamma^(floor(iter/iterations_per_step)) |
| Where iter is the current iteration of SGD. |
| gamma DOUBLE PRECISION, -- Default: '0.1' |
| Decay rate for learning rate. |
| Valid for learning_rate_policy = 'exp', or 'step' |
| power DOUBLE PRECISION, -- Default: '0.5' |
| Exponent for learning_rate_policy = 'inv' |
| iterations_per_step INTEGER, -- Default: '100' |
| Number of iterations to run before decreasing the learning |
| rate by a factor of gamma. Valid for learning rate |
| policy = 'step' |
| n_iterations INTEGER, -- Default: 100 |
| Number of iterations per try |
| n_tries INTEGER, -- Default: 1 |
| Total number of training cycles, |
| with random initializations to avoid |
| local minima. |
| tolerance DOUBLE PRECISION, -- Default: 0.001 |
| If the distance in loss between |
| two iterations is less than the |
| tolerance training will stop, even if |
| n_iterations has not been reached. |
| batch_size, -- Default: 1 for IGD, 20 for Minibatch |
| If the source_table is detected to contain data |
| that is supported by minibatch, then the solver |
| uses mini-batch gradient descent, with the specified |
| batch_size. |
| n_epochs -- Default: 1 for IGD, 10 for Minibatch |
| If the source_table is detected to contain data |
| that is supported by minibatch, then the solver |
| uses mini-batch gradient descent. During gradient |
| descent, n_epochs represents the number of times |
| all batches in a buffer are iterated over. |
| momentum -- Default: 0.9. Momentum can help accelerate |
| learning and avoid local minima when |
| using gradient descent. Value must be in the |
| range 0 to 1, where 0 means no momentum. |
| nesterov -- Default: TRUE. Nesterov momentum can provide |
| better results than using classical momentum alone, |
| due to its look ahead characteristics. In Nesterov |
| momentum, we first move the model in the direction of |
| velocity and use the updated model to calculate |
| the gradient. The main difference being that in |
| classical momentum, we compute the gradient before |
| updating the model whereas in nesterov we first update |
| the model and then compute the gradient from the |
| updated position. |
| """.format(**args) |
| |
| if not message: |
| return summary |
| elif message.lower() in ('usage', 'help', '?'): |
| return usage |
| elif message.lower() == 'optimizer_params': |
| return optimizer_params |
| return """ |
| No such option. Use "SELECT {schema_madlib}.{method}()" for help. |
| """.format(**args) |
| |
| |
| def mlp_predict_help(schema_madlib, message): |
| args = dict(schema_madlib=schema_madlib) |
| |
| summary = """ |
| ---------------------------------------------------------------- |
| SUMMARY |
| ---------------------------------------------------------------- |
| Multilayer Perceptron (MLP) is a model for regression and |
| classification |
| |
| Also called "vanilla neural networks", they consist of several |
| fully connected hidden layers with non-linear activation |
| functions. |
| |
| For more details on function usage: |
| SELECT {schema_madlib}.mlp_predict('usage')""".format(**args) |
| |
| usage = """ |
| --------------------------------------------------------------------------- |
| USAGE |
| --------------------------------------------------------------------------- |
| SELECT {schema_madlib}.mlp_predict( |
| model_table, -- name of model table |
| data_table, -- name of data table |
| id_col_name, -- id column for data table |
| output_table, -- name of output table |
| pred_type -- the type of output requested: |
| -- 'response' gives the actual prediction, |
| -- 'prob' gives the probability of each class. |
| -- for regression, only type='response' is defined. |
| ); |
| |
| |
| --------------------------------------------------------------------------- |
| OUTPUT |
| --------------------------------------------------------------------------- |
| The model table produced by mlp contains the following columns: |
| |
| id -- The provided id for the given input vector |
| |
| estimated_<COL_NAME> -- (For pred_type='response') The estimated class |
| for classification or value for regression, where |
| <COL_NAME> is the name of the column to be |
| predicted from training data |
| |
| prob_<CLASS> -- (For pred_type='prob' for classification) The |
| probability of a given class <CLASS> as given by |
| softmax. There will be one column for each class |
| in the training data. |
| |
| """.format(**args) |
| |
| if not message: |
| return summary |
| elif message.lower() in ('usage', 'help', '?'): |
| return usage |
| return """ |
| No such option. Use "SELECT {schema_madlib}.mlp_predict()" for help. |
| """.format(**args) |
| |
| |
| def check_if_minibatch_enabled(source_table, independent_varname): |
| """ |
| Function to validate if the source_table is converted to a format that |
| can be used for mini-batching. It checks for the dimensionalities of |
| the independent variable to determine the same. |
| """ |
| query = """ |
| SELECT array_upper({0}, 1) AS n_x, |
| array_upper({0}, 2) AS n_y, |
| array_upper({0}, 3) AS n_z |
| FROM {1} |
| LIMIT 1 |
| """.format(independent_varname, source_table) |
| result = plpy.execute(query) |
| |
| if not result: |
| plpy.error("MLP: Input table could be empty.") |
| |
| has_x_dim, has_y_dim, has_z_dim = [bool(result[0][i]) |
| for i in ('n_x', 'n_y', 'n_z')] |
| if not has_x_dim: |
| plpy.error("MLP: {0} is empty.".format(independent_varname)) |
| |
| # error out if >2d matrix |
| if has_z_dim: |
| plpy.error("MLP: Input table is not in the right format.") |
| return has_y_dim |
| |
| |
| class MLPMinibatchPreProcessor: |
| """ |
| This class consumes and validates the pre-processed source table used for |
| MLP mini-batch. This also populates values from the pre-processed summary |
| table which is used by MLP mini-batch |
| |
| """ |
| # summary table columns names |
| DEPENDENT_VARNAME = "dependent_varname" |
| DEPENDENT_VARTYPE = "dependent_vartype" |
| INDEPENDENT_VARNAME = "independent_varname" |
| GROUPING_COL = "grouping_cols" |
| CLASS_VALUES = "class_values" |
| MODEL_TYPE_CLASSIFICATION = "classification" |
| MODEL_TYPE_REGRESSION = "regression" |
| |
| def __init__(self, source_table): |
| self.source_table = source_table |
| self.preprocessed_summary_dict = None |
| self.summary_table = add_postfix(self.source_table, "_summary") |
| self.std_table = add_postfix(self.source_table, "_standardization") |
| |
| self._validate_and_set_preprocessed_summary() |
| |
| def _validate_and_set_preprocessed_summary(self): |
| if not table_exists(self.summary_table) or not table_exists(self.std_table): |
| plpy.error("Tables {0} and/or {1} do not exist. These tables are" |
| " needed for using minibatch during training.". |
| format(self.summary_table, self.std_table)) |
| |
| query = "SELECT * FROM {0}".format(self.summary_table) |
| summary_table_columns = plpy.execute(query) |
| if not summary_table_columns or len(summary_table_columns) == 0: |
| plpy.error("No columns in table {0}.".format(self.summary_table)) |
| else: |
| summary_table_columns = summary_table_columns[0] |
| |
| required_columns = (self.DEPENDENT_VARNAME, self.INDEPENDENT_VARNAME, |
| self.CLASS_VALUES, self.GROUPING_COL, self.DEPENDENT_VARTYPE) |
| if set(required_columns) <= set(summary_table_columns): |
| self.preprocessed_summary_dict = summary_table_columns |
| else: |
| plpy.error("One or more expected columns {0} not present in" |
| " summary table {1}. These columns are" |
| " needed for using minibatch during training.". |
| format(required_columns, self.summary_table)) |