blob: 329a426aec4e946a674ce77ac31aa1ef31b31dfe [file] [log] [blame]
# coding=utf-8
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
@file mlp_igd.py_in
@brief Multilayer perceptron using IGD: Driver functions
@namespace mlp_igd
import math
import plpy
from convex.utils_regularization import utils_ind_var_scales
from convex.utils_regularization import utils_ind_var_scales_grouping
from convex.utils_regularization import __utils_normalize_data
from convex.utils_regularization import __utils_normalize_data_grouping
from internal.db_utils import get_distinct_col_levels
from internal.db_utils import get_one_hot_encoded_expr
from internal.db_utils import quote_literal
from utilities.control import MinWarning
from utilities.in_mem_group_control import GroupIterationController
from utilities.utilities import _array_to_string
from utilities.utilities import _assert
from utilities.utilities import _assert_equal
from utilities.utilities import _string_to_array_with_quotes
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import get_grouping_col_str
from utilities.utilities import is_valid_psql_type
from utilities.utilities import NUMERIC, INTEGER, TEXT, BOOLEAN
from utilities.utilities import INCLUDE_ARRAY, ONLY_ARRAY
from utilities.utilities import py_list_to_sql_string as PY2SQL
from utilities.utilities import strip_end_quotes, split_quoted_delimited_str
from utilities.utilities import unique_string
from utilities.validate_args import array_col_has_same_dimension
from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import get_col_dimension
from utilities.validate_args import get_expr_type
from utilities.validate_args import input_tbl_valid
from utilities.validate_args import is_var_valid
from utilities.validate_args import output_tbl_valid
from utilities.validate_args import table_exists
from utilities.validate_args import quote_ident
from utilities.minibatch_validation import validate_dependent_var_for_minibatch
from utilities.utilities import create_cols_from_array_sql_string
def mlp(schema_madlib, source_table, output_table, independent_varname,
dependent_varname, hidden_layer_sizes, optimizer_param_str, activation,
is_classification, weights, warm_start, verbose=False, grouping_col=""):
@param schema_madlib
@param source_table
@param output_table
@param independent_varname
@param dependent_varname
@param hidden_layer_sizes
@param optimizer_param_str
warm_start = bool(warm_start)
optimizer_params = _get_optimizer_params(optimizer_param_str or "")
summary_table = add_postfix(output_table, "_summary")
standardization_table = add_postfix(output_table, "_standardization")
hidden_layer_sizes = hidden_layer_sizes or []
_validate_args(source_table, output_table, summary_table,
standardization_table, independent_varname,
dependent_varname, hidden_layer_sizes, optimizer_params,
warm_start, activation, grouping_col)
tolerance = optimizer_params['tolerance']
n_iterations = optimizer_params['n_iterations']
step_size_init = optimizer_params['learning_rate_init']
iterations_per_step = optimizer_params['iterations_per_step']
power = optimizer_params['power']
gamma = optimizer_params['gamma']
step_size = step_size_init
n_tries = optimizer_params['n_tries']
# lambda is a reserved word in python
lambda_ = optimizer_params['lambda']
batch_size = optimizer_params['batch_size']
n_epochs = optimizer_params['n_epochs']
momentum = optimizer_params['momentum']
is_nesterov = optimizer_params['nesterov']
# Note that we don't support weights with mini batching yet, so validate
# this based on is_minibatch_enabled.
weights = '1' if not weights or not weights.strip() else weights.strip()
is_minibatch_enabled = check_if_minibatch_enabled(source_table, independent_varname)
_validate_params_based_on_minibatch(source_table, independent_varname,
dependent_varname, weights,
activation = _get_activation_function_name(activation)
learning_rate_policy = _get_learning_rate_policy_name(
activation_index = _get_activation_index(activation)
# The original dependent_varname is required later if warm start is
# used, and while creating the model summary table. Keep a copy of it
# since dependent_varname is overwritten if one hot encoding is used.
dependent_varname_backup = dependent_varname
classes = []
if is_minibatch_enabled:
mlp_preprocessor = MLPMinibatchPreProcessor(source_table)
pp_summary_dict = mlp_preprocessor.preprocessed_summary_dict
if (pp_summary_dict[MLPMinibatchPreProcessor.GROUPING_COL]):
# if a valid grouping_col is provided then it should be same as the
# grouping_col used in preprocessing.
_assert(grouping_col == pp_summary_dict[MLPMinibatchPreProcessor.GROUPING_COL],
"MLP: Grouping column input should be same as the one used "
"in the preprocessor.")
batch_size = min(200, pp_summary_dict['buffer_size'])\
if batch_size == 1 else batch_size
tbl_data_scaled = source_table
col_ind_var_norm_new = MLPMinibatchPreProcessor.INDEPENDENT_VARNAME
col_dep_var_norm_new = MLPMinibatchPreProcessor.DEPENDENT_VARNAME
x_mean_table = mlp_preprocessor.std_table
num_input_nodes = get_col_dimension(source_table, independent_varname,
if is_classification:
if pp_summary_dict["class_values"]:
classes = [quote_literal(c) for c in pp_summary_dict["class_values"]]
num_output_nodes = len(classes)
# Assume that the dependent variable is already one-hot-encoded
num_output_nodes = get_col_dimension(source_table,
num_output_nodes = get_col_dimension(source_table,
dependent_varname, dim=2)
dependent_vartype = pp_summary_dict["dependent_vartype"]
grouping_col = grouping_col or ""
x_mean_table = unique_string(desp='x_mean_table')
tbl_data_scaled = unique_string(desp="tbl_data_scaled")
col_ind_var_norm_new = unique_string(desp="ind_var_norm")
col_dep_var_norm_new = unique_string(desp="dep_var_norm")
# Standardize the data, and create a standardized version of the
# source_table in tbl_data_scaled. Use this standardized table for IGD.
num_input_nodes = get_col_dimension(source_table, independent_varname,
dimension = num_input_nodes # dimension is used for normalize
dependent_vartype = get_expr_type(dependent_varname, source_table)
# We are now using tbl_data_scaled, so change the dependent
# varname accordingly.
dependent_varname = col_dep_var_norm_new
if is_classification:
# If dependent variable is an array during classification, assume
# that it is already one-hot-encoded.
if "[]" in dependent_vartype:
num_output_nodes = get_col_dimension(tbl_data_scaled,
classes = get_distinct_col_levels(
source_table, dependent_varname_backup, dependent_vartype)
num_output_nodes = len(classes)
dependent_varname = get_one_hot_encoded_expr(dependent_varname,
if "[]" not in dependent_vartype:
dependent_varname = "ARRAY[{0}]".format(col_dep_var_norm_new)
num_output_nodes = get_col_dimension(tbl_data_scaled,
dependent_varname, dim=1)
reserved_cols = ['coeff', 'loss', 'n_iterations']
grouping_str, grouping_col = get_grouping_col_str(schema_madlib, 'MLP',
# Need layers sizes before validating for warm_start
layer_sizes = [num_input_nodes] + hidden_layer_sizes + [num_output_nodes]
col_grp_key = unique_string(desp='col_grp_key')
if warm_start:
coeff = _validate_warm_start(output_table, summary_table,
standardization_table, independent_varname,
dependent_varname_backup, layer_sizes,
optimizer_params, is_classification,
weights, warm_start, activation)
if grouping_col:
# get an independent warm start coefficient for each group
grouping_col_list = split_quoted_delimited_str(grouping_col)
join_condition = ' AND '.join(['p.{0} = {0}'.format(col)
for col in grouping_col_list])
start_coeff = """
SELECT coeff
FROM {output_table} as p
WHERE {join_condition} AND
array_to_string(ARRAY[{grouping_str}], ',') = {col_grp_key}
start_coeff = PY2SQL(coeff, array_type="DOUBLE PRECISION")
# if warm start not enabled then initialization is the responsibility of
# the model
start_coeff = "NULL::DOUBLE PRECISION[]"
if grouping_col:
group_by_clause = "GROUP BY {0}, {1}".format(grouping_col, col_grp_key)
grouping_str_comma = grouping_col + ","
using_clause = "USING ({0})".format(col_grp_key)
group_by_clause, grouping_str_comma, using_clause = "", "", "ON TRUE"
# local variables
it_args = {
"schema_madlib": schema_madlib,
"independent_varname": independent_varname,
"dependent_varname": dependent_varname,
"prev_state": None,
"layer_sizes": PY2SQL(layer_sizes, array_type="DOUBLE PRECISION"),
"step_size": step_size,
"source_table": source_table,
"output_table": output_table,
"activation": activation_index,
"is_classification": int(is_classification),
"weights": weights,
"warm_start": warm_start,
"n_iterations": n_iterations,
"tolerance": tolerance,
"lambda_": lambda_,
"grouping_col": grouping_col,
"grouping_str": grouping_str,
"x_mean_table": x_mean_table,
"batch_size": batch_size,
"n_epochs": n_epochs,
"start_coeff": start_coeff,
"momentum": momentum,
"is_nesterov": is_nesterov
# variables to be used by GroupIterationController
'rel_args': unique_string(desp='rel_args'),
'rel_state': unique_string(desp='rel_state'),
'col_grp_iteration': unique_string(desp='col_grp_iteration'),
'col_grp_state': unique_string(desp='col_grp_state'),
'col_grp_key': col_grp_key,
'col_n_tuples': unique_string(desp='col_n_tuples'),
'state_type': "double precision[]",
'rel_source': tbl_data_scaled,
'col_ind_var': col_ind_var_norm_new,
'col_dep_var': col_dep_var_norm_new,
'state_size': -1
# variables used in constructing output tables
'group_by_clause': group_by_clause,
'using_clause': using_clause,
'grouping_str_comma': grouping_str_comma,
first_try = True
temp_output_table = unique_string(desp='temp_output_table')
for _ in range(n_tries):
prev_state = None
iterationCtrl = GroupIterationController(it_args)
with iterationCtrl as it:
it.iteration = 0
while True:
if learning_rate_policy == "exp":
step_size = step_size_init * gamma**it.iteration
elif learning_rate_policy == "inv":
step_size = step_size_init * (it.iteration+1)**(-power)
elif learning_rate_policy == "step":
step_size = step_size_init * gamma**(
math.floor(it.iteration / iterations_per_step))
it.kwargs['step_size'] = step_size
if is_minibatch_enabled:
train_sql = """
({independent_varname})::DOUBLE PRECISION[],
({dependent_varname})::DOUBLE PRECISION[],
({weights})::DOUBLE PRECISION,
({start_coeff})::DOUBLE PRECISION[],
train_sql = """
({col_ind_var})::DOUBLE PRECISION[],
({dependent_varname})::DOUBLE PRECISION[],
({weights})::DOUBLE PRECISION,
({start_coeff})::DOUBLE PRECISION[],
if it.kwargs['state_size'] == -1:
it.kwargs['state_size'] = it.get_state_size()
if it.test("""
{iteration} >= {n_iterations}
abs(_state_previous[{state_size}] -
_state_current[{state_size}]) < {tolerance}
if verbose and it.iteration < n_iterations:
# Get loss value from the state.
res = it.get_param_value_per_group(
"_state_current[array_upper(_state_current, 1)] AS loss")
# Create a list of grouping values if grouping_cols was
# used, it will be an empty list if there was no grouping.
groups = [t[col_grp_key] for t in res if t[col_grp_key]]
losses = [t['loss'] for t in res]
loss = zip(groups, losses) if groups else losses"Iteration: {0}, Loss: <{1}>".
format(it.iteration, ', '.join(map(str, loss))))
_update_temp_model_table(it_args, it.iteration, temp_output_table,
is_minibatch_enabled, first_try)
first_try = False
layer_sizes_str = PY2SQL(layer_sizes, array_type="integer")
if is_minibatch_enabled:
# We already have the mean and std in the input standardization table
input_std_table = add_postfix(source_table, '_standardization')
_create_standardization_table(standardization_table, input_std_table,
# The original input table is the tab_data_scaled for mini batch.
# Do NOT drop tbl_data_scaled and x_mean_table with minibatch,
# it will end up dropping the original data table.
_create_standardization_table(standardization_table, x_mean_table,
# Drop the following tables only for IGD.
plpy.execute("DROP TABLE IF EXISTS {0}".format(tbl_data_scaled))
plpy.execute("DROP TABLE IF EXISTS {0}".format(x_mean_table))
_create_output_table(output_table, temp_output_table, grouping_col,
plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_output_table))
return None
def normalize_data(args):
Create a new temp table (tbl_data_scaled) with the standardized version
of the independent variable column (independent_varname) in the input
data table (source_table).
# We don't have to standardize the dependent variable.
y_decenter = False
# For MLP we need to change std. dev value of 0 to 1. Set the following
# flag to True to invoke the corresponding utils_regularization method.
set_zero_std_to_one = True
if args["grouping_col"]:
# When grouping_col is defined, we must find an array containing
# the mean and std of every dimension in the independent variable (x)
# specific to groups. Store these results in temp tables x_mean_table
# __utils_normalize_data_grouping reads the various means and stds
# from the tables.
args["dimension"], args["schema_madlib"],
# When no grouping_col is defined, the mean and std for 'x'
# can be defined using strings, stored in x_mean_str, x_std_str.
# We don't need a table like how we needed for grouping.
x_scaled_vals = utils_ind_var_scales(args["source_table"],
x_mean_str = _array_to_string(x_scaled_vals["mean"])
x_std_str = _array_to_string(x_scaled_vals["std"])
return None
# ------------------------------------------------------------------------
def _create_standardization_table(standardization_table, x_mean_table, warm_start):
if warm_start:
plpy.execute("DROP TABLE IF EXISTS {0}".format(standardization_table))
standarization_table_creation_query = """
CREATE TABLE {standardization_table} AS (
SELECT * FROM {x_mean_table}
def _create_summary_table(args):
grouping_text = "NULL" if not args['grouping_col'] else args['grouping_col']
if args['warm_start']:
plpy.execute("DROP TABLE IF EXISTS {0}".format(args['summary_table']))
classes_type = args['dependent_vartype']
minibatch_summary_col_names = ''
minibatch_summary_col_vals = ''
dependent_vartype_colname = 'dependent_vartype'
if args['is_minibatch_enabled']:
# Add a few more columns in the summary table
minibatch_summary_col_names = """
original_source_table TEXT,
original_independent_varname TEXT,
original_dependent_varname TEXT,
batch_size INTEGER,
n_epochs INTEGER,
dependent_vartype_colname = 'original_dependent_vartype'
mlp_pre_dict = args['pp_summary_dict']
source_table = mlp_pre_dict['source_table']
independent_varname = mlp_pre_dict['independent_varname']
dependent_varname = mlp_pre_dict['dependent_varname']
# This variable is used for creating the classes_str column in the model
# summary table. We append [] when we create this column in the create
# summary table command so we need to strip it out here.
if classes_type[-2:] == '[]':
classes_type = classes_type[:-2]
batch_size = args['batch_size']
n_epochs = args['n_epochs']
minibatch_summary_col_vals = """
summary_table_creation_query = """
CREATE TABLE {summary_table}(
source_table TEXT,
independent_varname TEXT,
dependent_varname TEXT,
{dependent_vartype_colname} TEXT,
tolerance FLOAT,
learning_rate_init FLOAT,
learning_rate_policy TEXT,
momentum FLOAT,
nesterov BOOLEAN,
n_iterations INTEGER,
n_tries INTEGER,
layer_sizes INTEGER[],
activation TEXT,
is_classification BOOLEAN,
classes {classes_type}[],
weights VARCHAR,
grouping_col VARCHAR
summary_table_update_query = """
INSERT INTO {summary_table} VALUES(
)""".format(classes_str=PY2SQL(args['classes'], array_type=classes_type,
def _create_output_table(output_table, temp_output_table,
grouping_col, warm_start):
grouping_col_comma = ''
partition_by = ''
if grouping_col:
grouping_col_comma = grouping_col + ","
partition_by = " PARTITION BY {0} ".format(grouping_col)
if warm_start:
plpy.execute("DROP TABLE IF EXISTS {0}".format(output_table))
build_output_query = """
CREATE TABLE {output_table} AS
SELECT {grouping_col_comma} coeff, loss, num_iterations
SELECT {temp_output_table}.*,
row_number() OVER ({partition_by} ORDER BY loss) AS rank
FROM {temp_output_table}
) x
WHERE x.rank = 1;
def _update_temp_model_table(args, iteration, temp_output_table,
is_minibatch_enabled, first_try):
insert_or_create_str = "INSERT INTO {0}"
if first_try:
insert_or_create_str = "CREATE TEMP TABLE {0} as"
insert_or_create_str = insert_or_create_str.format(temp_output_table)
join_clause = ''
if args['grouping_col']:
join_clause = """
) AS {col_grp_key}
FROM {source_table}
) grouping_q
if is_minibatch_enabled:
internal_result_udf = "internal_mlp_minibatch_result"
internal_result_udf = "internal_mlp_igd_result"
model_table_query = """
(result).coeff as coeff,
(result).loss as loss,
{iteration} as num_iterations
) AS result,
FROM {rel_state}
WHERE {col_grp_iteration} = {iteration}
) rel_state_subq
iteration=iteration, join_clause=join_clause,
internal_result_udf=internal_result_udf, **args)
def _get_optimizer_params(param_str):
params_defaults = {
"learning_rate_init": (0.001, float),
"n_iterations": (100, int),
"n_tries": (1, int),
"tolerance": (0.001, float),
"learning_rate_policy": ("constant", str),
"gamma": (0.1, float),
"iterations_per_step": (100, int),
"power": (0.5, float),
"lambda": (0, float),
"n_epochs": (1, int),
"batch_size": (1, int),
"momentum": (0.9, float),
"nesterov": (True, bool)
param_defaults = dict([(k, v[0]) for k, v in params_defaults.items()])
param_types = dict([(k, v[1]) for k, v in params_defaults.items()])
if not param_str:
return param_defaults
name_value = extract_keyvalue_params(
param_str, param_types, param_defaults, ignore_invalid=False)
return name_value
def _validate_standardization_table(standardization_table, glist=[]):
input_tbl_valid(standardization_table, 'MLP')
cols_in_tbl_valid(standardization_table, glist + ['mean', 'std'], 'MLP')
def _validate_summary_table(summary_table):
input_tbl_valid(summary_table, 'MLP')
cols_in_tbl_valid(summary_table, [
'dependent_varname', 'independent_varname', 'activation',
'tolerance', 'learning_rate_init', 'n_iterations', 'n_tries',
'classes', 'layer_sizes', 'source_table'
], 'MLP')
def _validate_warm_start(output_table, summary_table, standardization_table,
independent_varname, dependent_varname, layer_sizes,
optimizer_params, is_classification, weights,
warm_start, activation):
"MLP error: Warm start failed due to missing model table: " + output_table)
"MLP error: Warm start failed due to missing summary table: " + summary_table)
"MLP error: Warm start failed due to missing standardization table: " + standardization_table)
_assert(optimizer_params["n_tries"] == 1,
"MLP error: warm_start is only compatible for n_tries = 1")
summary = plpy.execute("SELECT * FROM {0}".format(summary_table))[0]
params = [
"independent_varname", "dependent_varname", "layer_sizes",
"is_classification", "weights", "activation"
for param in params:
_assert_equal(eval(param), summary[param],
"MLP error: warm start failed due to different parameter value: " +
output = plpy.execute("SELECT * FROM {0}".format(output_table))
num_coeffs = sum(
map(lambda i: (layer_sizes[i] + 1) * (layer_sizes[i + 1]),
range(len(layer_sizes) - 1)))
for row in output:
coeff = row['coeff']
"MLP error: Warm start failed to invalid output_table: " +
output_table + ". Invalid number of coefficients in model.")
return coeff
def _validate_dependent_var(source_table, dependent_varname,
is_classification, is_minibatch_enabled):
expr_type = get_expr_type(dependent_varname, source_table)
classification_types = INTEGER | BOOLEAN | TEXT
if is_minibatch_enabled:
if is_classification:
source_table, dependent_varname, expr_type)
if is_classification:
_assert((is_valid_psql_type(expr_type, NUMERIC | ONLY_ARRAY)
and not _get_dep_var_second_dim(dependent_varname, source_table)
or is_valid_psql_type(expr_type, classification_types),
"Dependent variable column should either be a numeric 1-D"
" array, or be of type: {0}".format(','.join(classification_types)))
_assert(is_valid_psql_type(expr_type, NUMERIC | INCLUDE_ARRAY),
"Dependent variable column should be of numeric type.")
def _get_dep_var_second_dim(dependent_varname, source_table):
# Check if dependent variable is an array of two or higher dimension
# Return back the value of the second dimension, returns None if it less
# than 2-D.
dep_array_sec_dim = plpy.execute("""
SELECT array_upper({0}, 2) AS n_y
FROM {1}
""".format(dependent_varname, source_table))
return dep_array_sec_dim[0]['n_y']
def _validate_params_based_on_minibatch(source_table, independent_varname,
dependent_varname, weights,
Some params have to be validated after knowing if the solver is
minibatch or not.
if is_minibatch_enabled:
_assert(weights == '1',
"MLP Error: The input weights param is not supported with"
" mini-batch version of MLP.")
is_valid_psql_type(get_expr_type(weights, source_table), NUMERIC)
_assert(array_col_has_same_dimension(source_table, independent_varname),
"Independent variable column should refer to arrays of the same length")
_validate_dependent_var(source_table, dependent_varname,
is_classification, is_minibatch_enabled)
def _validate_args(source_table, output_table, summary_table,
standardization_table, independent_varname,
dependent_varname, hidden_layer_sizes, optimizer_params,
warm_start, activation, grouping_col):
input_tbl_valid(source_table, "MLP")
if not warm_start:
output_tbl_valid(output_table, "MLP")
output_tbl_valid(summary_table, "MLP")
output_tbl_valid(standardization_table, "MLP")
_assert(is_var_valid(source_table, independent_varname),
"MLP error: invalid independent_varname "
"('{independent_varname}') for source_table "
# independent variable should either be a 1D or 2D(minibatch) array
_assert("[]" in get_expr_type(independent_varname, source_table),
"Independent variable column should refer to an array")
_assert(is_var_valid(source_table, dependent_varname),
"MLP error: invalid dependent_varname "
"('{dependent_varname}') for source_table "
dependent_varname=dependent_varname, source_table=source_table))
_assert(isinstance(hidden_layer_sizes, list),
"hidden_layer_sizes must be an array of integers")
_assert(all(isinstance(value, int) for value in hidden_layer_sizes),
"MLP error: Hidden layers sizes must be integers")
_assert(all(value >= 0 for value in hidden_layer_sizes),
"MLP error: Hidden layers sizes must be greater than 0.")
_assert(optimizer_params["lambda"] >= 0,
"MLP error: lambda should be greater than or equal to 0.")
_assert(optimizer_params["tolerance"] >= 0,
"MLP error: tolerance should be greater than or equal to 0.")
_assert(optimizer_params["n_tries"] >= 1,
"MLP error: n_tries should be greater than or equal to 1")
_assert(optimizer_params["n_iterations"] >= 1,
"MLP error: n_iterations should be greater than or equal to 1")
_assert(optimizer_params["power"] > 0,
"MLP error: power should be greater than 0.")
_assert(0 < optimizer_params["gamma"] <= 1,
"MLP error: gamma should be between 0 and 1.")
_assert(optimizer_params["iterations_per_step"] > 0,
"MLP error: iterations_per_step should be greater than 0.")
_assert(optimizer_params["learning_rate_init"] > 0,
"MLP error: learning_rate_init should be greater than 0.")
_assert(optimizer_params["batch_size"] > 0,
"MLP error: batch_size should be greater than 0.")
_assert(optimizer_params["n_epochs"] > 0,
"MLP error: n_epochs should be greater than 0.")
_assert(0 <= optimizer_params["momentum"] <= 1,
"MLP error: momentum should be in the range [0, 1].")
if grouping_col:
invalid_names=[independent_varname, dependent_varname])
def _get_learning_rate_policy_name(learning_rate_policy):
if not learning_rate_policy:
learning_rate_policy = 'constant'
supported_learning_rate_policies = ['constant', 'exp', 'inv', 'step']
learning_rate_policy = next(
x for x in supported_learning_rate_policies
if x.startswith(learning_rate_policy))
except StopIteration:
"MLP Error: Invalid learning rate policy: "
"{0}. Supported learning rate policies are ({1}).".
', '.join(supported_learning_rate_policies)))
return learning_rate_policy
def _get_activation_function_name(activation):
if not activation:
activation = 'sigmoid'
supported_activation_function = ['relu', 'sigmoid', 'tanh']
activation = next(
x for x in supported_activation_function
if x.startswith(activation))
except StopIteration:
plpy.error("MLP Error: Invalid activation function: "
"{0}. Supported activation functions are ({1}).".
', '.join(supported_activation_function)))
return activation
def _get_activation_index(activation_name):
table = {"relu": 0, "sigmoid": 1, "tanh": 2}
return table[activation_name]
def _format_label(label):
if isinstance(label, str):
return "'{0}'".format(label)
return label
def _get_minibatch_param_from_mlp_model_summary(summary_dict, param,
Return the value of specific columns from the model summary table.
This is to be used only for these params:
If the model was trained with minibatch, there would be four new
columns introduced, that correspond to the above columns:
This is because, when minibatch is used, the column names without
prefix 'original_' will have the values from the minibatch preprocessed
input table, and the column names with the prefix correspond to the
original table that was input to the minibatch preprocessing step.
Only dependent_vartype or original_dependent_vartype can exist.
This is used by predict regression to know if the dependent column
is array or not.
if minibatch_param in summary_dict:
return summary_dict[minibatch_param]
elif param in summary_dict:
return summary_dict[param]
return None
def mlp_predict(schema_madlib, model_table, data_table, id_col_name,
output_table, pred_type='response', **kwargs):
""" Score new observations using a trained neural network
@param schema_madlib Name of the schema where MADlib is installed
@param model_table Name of learned model
@param data_table Name of table/view containing the data
points to be scored
@param id_col_name Name of column in source_table containing
(integer) identifier for data point
@param output_table Name of table to store the results
@param pred_type: str, The type of output required:
'response' gives the actual response values,
'prob' gives the probability of the classes in a
For regression, only type='response' is defined.
input_tbl_valid(model_table, 'MLP')
cols_in_tbl_valid(model_table, ['coeff'], 'MLP')
summary_table = add_postfix(model_table, "_summary")
standardization_table = add_postfix(model_table, "_standardization")
summary = plpy.execute("SELECT * FROM {0}".format(summary_table))[0]
coeff = PY2SQL(plpy.execute(
"SELECT * FROM {0}".format(model_table))[0]["coeff"])
dependent_varname = _get_minibatch_param_from_mlp_model_summary(summary,
'dependent_varname', 'original_dependent_varname')
independent_varname = _get_minibatch_param_from_mlp_model_summary(summary,
'independent_varname', 'original_independent_varname')
source_table = _get_minibatch_param_from_mlp_model_summary(summary,
'source_table', 'original_source_table')
activation = _get_activation_index(summary['activation'])
layer_sizes = PY2SQL(
summary['layer_sizes'], array_type="DOUBLE PRECISION")
is_response = int(pred_type == 'response')
is_classification = int(summary["is_classification"])
classes = summary['classes']
# Set a flag to indicate that it is a classification model, with an array
# as the dependent var. The only scenario where classification allows for
# an array dep var is when the user has provided a one-hot encoded dep var
# during training, and mlp_classification does not one-hot encode
# (and hence classes column in model's summary table is NULL).
is_dep_var_an_array_for_classification = int(is_classification and not classes)
# Fix to ensure that 1.12 models run on 1.13 or higher.
# As a result of adding grouping support in 1.13, some changes were
# made wrt standardization.
# The x_mean and x_std values were stored in the summary table itself in
# MADlib 1.12, and they were named as "x_means" and "x_stds".
# From MADlib 1.13 onwards, these parameters were moved to the
# _standardization table, and were renamed to "mean" and "std".
if 'grouping_col' in summary:
# This model was created in MADlib 1.13 or greater version
is_pre_113_model = False
grouping_col = '' if summary['grouping_col']=='NULL' \
else summary['grouping_col']
# This model was created in MADlib 1.12. Grouping was not
# supported in 1.12, but was added later in 1.13.
is_pre_113_model = True
grouping_col = ''
# Validate the summary table created with the 1.12 MLP model table.
cols_in_tbl_valid(summary_table, ['x_means', 'x_stds'], 'MLP')
pred_name = (('"prob_{0}"' if pred_type == "prob" else '"estimated_{0}"').
format(dependent_varname.replace('"', '').strip()))
input_tbl_valid(data_table, 'MLP')
is_var_valid(data_table, independent_varname),
"MLP Error: independent_varname ('{0}') is invalid for data_table ({1})".
format(independent_varname, data_table))
_assert(id_col_name is not None, "MLP Error: id_col_name is NULL")
is_var_valid(data_table, id_col_name),
"MLP Error: id_col_name ('{0}') is invalid for {1}".format(
id_col_name, data_table))
output_tbl_valid(output_table, 'MLP')
header = "CREATE TABLE " + output_table + " AS "
select_grouping_col = ""
group_by_predict_str = ""
grouping_col_comma = ""
join_str = ''
grouping_col_list = split_quoted_delimited_str(grouping_col)
if not is_pre_113_model:
_validate_standardization_table(standardization_table, grouping_col_list)
if grouping_col:
join_str = """JOIN {model_table}
USING ({grouping_col})
JOIN {standardization_table}
USING ({grouping_col})
group_by = ', '.join(['{0}.{1}'.format(data_table, col)
for col in grouping_col_list])
group_by_predict_str = ("ORDER BY {0}, {1}.{2}".
format(group_by, data_table, id_col_name))
select_grouping_col = ','.join(['q.{0}'.format(col)
for col in grouping_col_list]) + ','
grouping_col_comma = grouping_col+","
coeff_column = "{model_table}.coeff::DOUBLE PRECISION[]".format(**locals())
mean_col = "{standardization_table}.mean".format(**locals())
std_col = "{standardization_table}.std".format(**locals())
# if not grouping, then directly read out the coeff, mean
# and std values from the model and standardization tables.
if is_pre_113_model:
# Get mean and std from the summary table
standardization = plpy.execute("""
SELECT x_means AS mean, x_stds AS std
FROM {0}
# Get mean and std from the standardization table
standardization = plpy.execute("""
SELECT mean, std
FROM {0}
coeff = PY2SQL(plpy.execute(
"SELECT coeff FROM {0}".format(model_table))[0]["coeff"])
x_means = PY2SQL(standardization['mean'], array_type="DOUBLE PRECISION")
x_stds = PY2SQL(standardization['std'], array_type="DOUBLE PRECISION")
coeff_column = "{coeff}".format(**locals())
mean_col = "{x_means}".format(**locals())
std_col = "{x_stds}".format(**locals())
predict_uda_query = """{schema_madlib}.internal_predict_mlp(
{independent_varname}::DOUBLE PRECISION[],
if is_classification:
if pred_type == "response":
if classes:
prediction_select_clause = "(ARRAY{0})[pred_idx[1]+1] AS {1}".format(classes, pred_name)
# Case when the training step did not have to one-hot encode
# the dependent var.
prediction_select_clause = "pred_idx AS {0}".format(pred_name)
sql = header + """
SELECT {select_grouping_col}
SELECT {grouping_col_comma}
{predict_uda_query} AS pred_idx
FROM {data_table}
) q
intermediate_col = unique_string()
if classes:
score_format, _ = create_cols_from_array_sql_string(
classes, intermediate_col, 'prob', 'double precision', False, 'MLP')
# Case when the training step did not have to one-hot encode
# the dependent var.
score_format = '{0} AS estimated_prob'.format(intermediate_col)
sql = header + """
SELECT {select_grouping_col}
SELECT {grouping_col_comma}
{predict_uda_query}::TEXT[] AS {intermediate_col}
FROM {data_table}
) q
# Regression
dependent_type = _get_minibatch_param_from_mlp_model_summary(summary,
'dependent_vartype', 'original_dependent_vartype')
unnest_if_not_array = ""
# Return the same type as the user provided. Internally we always
# use an array, but if they provided a scalar, unnest it for
# the user
# If the dependent_type is None, it means that the model was trained
# before this column was added (< 1.14). In this case, always unnest the
# output. Note that this will return a array of len 1 even if the input is
# scalar
if dependent_type and "[]" not in dependent_type:
unnest_if_not_array = "UNNEST"
sql = header + """
SELECT {grouping_col_comma}
{unnest_if_not_array}({predict_uda_query}) AS {pred_name}
FROM {data_table}
sql = sql.format(**locals())
def mlp_help(schema_madlib, message, is_classification):
method = 'mlp_classification' if is_classification else 'mlp_regression'
int_types = ['integer', 'smallint', 'bigint']
text_types = ['text', 'varchar', 'character varying', 'char', 'character']
boolean_types = ['boolean']
supported_types = " " * 33 + ", ".join(text_types) + "\n" +\
" " * 33 + ", ".join(int_types + boolean_types)
label_description_classification = "Name of a column which specifies label.\n" +\
" " * 33 + "Supported types are:\n" + supported_types
label_description_regression = (
"Dependent variable. May be an array for \n" + " " * 33 +
"multiple regression or the name of a column which is any\n" + " " * 33 +
"numeric type for single regression")
label_description = label_description_classification if is_classification\
else label_description_regression
args = dict(schema_madlib=schema_madlib, method=method,
summary = """
Multilayer Perceptron (MLP) is a model for regression and
Also called "vanilla neural networks", they consist of several
fully connected hidden layers with non-linear activation
For more details on function usage:
SELECT {schema_madlib}.{method}('usage')""".format(**args)
usage = """
SELECT {schema_madlib}.{method}(
source_table, -- TEXT. name of input table
output_table, -- TEXT. name of output model table
independent_varname, -- TEXT. name of independent variable
dependent_varname, -- TEXT. {label_description}
hidden_layer_sizes, -- INTEGER[]. Array of integers indicating the
number of hidden units per layer.
Length equal to the number of hidden layers.
optimizer_params, -- TEXT. optional, default NULL
parameters for optimization in
a comma-separated string of key-value pairs.
To find out more:
SELECT {schema_madlib}.{method}('optimizer_params')
activation -- TEXT. optional, default: 'sigmoid'.
supported activations: 'relu', 'sigmoid',
and 'tanh'
weights -- TEXT. optional, default: NULL.
Weights for input rows. Column name which
specifies the weight for each input row.
This weight will be incorporated into the
update during SGD, and will not be used
for loss calculations. If not specified,
weight for each row will default to 1.
Column should be a numeric type.
warm_start -- BOOLEAN. optional, default: FALSE.
Initalize weights with the coefficients from
the last call. If true, weights will
be initialized from output_table. Note that
all parameters other than optimizer_params,
and verbose must remain constant between calls
to warm_start.
verbose -- BOOLEAN. optional, default: FALSE
Provides verbose output of the results of
grouping_col -- TEXT. optional, default: NULL
A single column or a list of comma-separated
columns that divides the input data into discrete
groups, resulting in one model per group. When
this value is NULL, no grouping is used and a
single model is generated for all data.
The model table produced by MLP contains the following columns:
coeffs -- Flat array containing the weights of the neural net
loss -- The total loss over the training data. Cross entropy
-- for classification and MSE for regression
num_iterations -- The total number of training iterations
The algorithm also creates a summary table named <output_table>_summary
that has the following columns:
source_table -- The source table.
independent_varname -- The independent variables.
dependent_varname -- The dependent variable.
tolerance -- The tolerance as given in optimizer_params.
learning_rate_init -- The initial learning rate as given in optimizer_params.
learning_rate_policy -- The learning rate policy as given in optimizer_params.
momentum -- Momentum value as given in optimizer_params.
nesterov -- Nesterov value as given in optimizer_params.
n_iterations -- The number of iterations run.
n_tries -- The number of tries as given in optimizer_params.
layer_sizes -- The number of units in each layer including the input
-- and output layer.
activation -- The activation function.
is_classification -- True if the model was trained for classification, False
-- if it was trained for regression.
classes -- The classes which were trained against (empty for
-- regression).
weights -- The weight column used during training.
grouping_col -- NULL if no grouping_col was specified during training,
-- and a comma separated list of grouping column names if not.
The algorithm also creates a standardization table that stores some meta data
used during prediction, and is named <output_table>_standardization. It has
the following columns:
x_means -- The mean for all input features (used for normalization).
x_stds -- The standard deviation for all input features (used for
-- normalization).
grouping columns -- If grouping_col is specified during training, a column for
-- each grouping column is created.
optimizer_params = """
learning_rate_init DOUBLE PRECISION, -- Default: 0.001
Initial learning rate
learning_rate_policy VARCHAR, -- Default: 'constant'
One of 'constant','exp','inv','step'
'constant': learning_rate =
'exp': learning_rate =
learning_rate_init * gamma^(iter)
'inv': learning_rate =
learning_rate_init * (iter+1)^(-power)
'step': learning_rate =
learning_rate_init * gamma^(floor(iter/iterations_per_step))
Where iter is the current iteration of SGD.
gamma DOUBLE PRECISION, -- Default: '0.1'
Decay rate for learning rate.
Valid for learning_rate_policy = 'exp', or 'step'
power DOUBLE PRECISION, -- Default: '0.5'
Exponent for learning_rate_policy = 'inv'
iterations_per_step INTEGER, -- Default: '100'
Number of iterations to run before decreasing the learning
rate by a factor of gamma. Valid for learning rate
policy = 'step'
n_iterations INTEGER, -- Default: 100
Number of iterations per try
n_tries INTEGER, -- Default: 1
Total number of training cycles,
with random initializations to avoid
local minima.
tolerance DOUBLE PRECISION, -- Default: 0.001
If the distance in loss between
two iterations is less than the
tolerance training will stop, even if
n_iterations has not been reached.
batch_size, -- Default: 1 for IGD, 20 for Minibatch
If the source_table is detected to contain data
that is supported by minibatch, then the solver
uses mini-batch gradient descent, with the specified
n_epochs -- Default: 1 for IGD, 10 for Minibatch
If the source_table is detected to contain data
that is supported by minibatch, then the solver
uses mini-batch gradient descent. During gradient
descent, n_epochs represents the number of times
all batches in a buffer are iterated over.
momentum -- Default: 0.9. Momentum can help accelerate
learning and avoid local minima when
using gradient descent. Value must be in the
range 0 to 1, where 0 means no momentum.
nesterov -- Default: TRUE. Nesterov momentum can provide
better results than using classical momentum alone,
due to its look ahead characteristics. In Nesterov
momentum, we first move the model in the direction of
velocity and use the updated model to calculate
the gradient. The main difference being that in
classical momentum, we compute the gradient before
updating the model whereas in nesterov we first update
the model and then compute the gradient from the
updated position.
if not message:
return summary
elif message.lower() in ('usage', 'help', '?'):
return usage
elif message.lower() == 'optimizer_params':
return optimizer_params
return """
No such option. Use "SELECT {schema_madlib}.{method}()" for help.
def mlp_predict_help(schema_madlib, message):
args = dict(schema_madlib=schema_madlib)
summary = """
Multilayer Perceptron (MLP) is a model for regression and
Also called "vanilla neural networks", they consist of several
fully connected hidden layers with non-linear activation
For more details on function usage:
SELECT {schema_madlib}.mlp_predict('usage')""".format(**args)
usage = """
SELECT {schema_madlib}.mlp_predict(
model_table, -- name of model table
data_table, -- name of data table
id_col_name, -- id column for data table
output_table, -- name of output table
pred_type -- the type of output requested:
-- 'response' gives the actual prediction,
-- 'prob' gives the probability of each class.
-- for regression, only type='response' is defined.
The model table produced by mlp contains the following columns:
id -- The provided id for the given input vector
estimated_<COL_NAME> -- (For pred_type='response') The estimated class
for classification or value for regression, where
<COL_NAME> is the name of the column to be
predicted from training data
prob_<CLASS> -- (For pred_type='prob' for classification) The
probability of a given class <CLASS> as given by
softmax. There will be one column for each class
in the training data.
if not message:
return summary
elif message.lower() in ('usage', 'help', '?'):
return usage
return """
No such option. Use "SELECT {schema_madlib}.mlp_predict()" for help.
def check_if_minibatch_enabled(source_table, independent_varname):
Function to validate if the source_table is converted to a format that
can be used for mini-batching. It checks for the dimensionalities of
the independent variable to determine the same.
query = """
SELECT array_upper({0}, 1) AS n_x,
array_upper({0}, 2) AS n_y,
array_upper({0}, 3) AS n_z
FROM {1}
""".format(independent_varname, source_table)
result = plpy.execute(query)
if not result:
plpy.error("MLP: Input table could be empty.")
has_x_dim, has_y_dim, has_z_dim = [bool(result[0][i])
for i in ('n_x', 'n_y', 'n_z')]
if not has_x_dim:
plpy.error("MLP: {0} is empty.".format(independent_varname))
# error out if >2d matrix
if has_z_dim:
plpy.error("MLP: Input table is not in the right format.")
return has_y_dim
class MLPMinibatchPreProcessor:
This class consumes and validates the pre-processed source table used for
MLP mini-batch. This also populates values from the pre-processed summary
table which is used by MLP mini-batch
# summary table columns names
DEPENDENT_VARNAME = "dependent_varname"
DEPENDENT_VARTYPE = "dependent_vartype"
INDEPENDENT_VARNAME = "independent_varname"
GROUPING_COL = "grouping_cols"
CLASS_VALUES = "class_values"
def __init__(self, source_table):
self.source_table = source_table
self.preprocessed_summary_dict = None
self.summary_table = add_postfix(self.source_table, "_summary")
self.std_table = add_postfix(self.source_table, "_standardization")
def _validate_and_set_preprocessed_summary(self):
if not table_exists(self.summary_table) or not table_exists(self.std_table):
plpy.error("Tables {0} and/or {1} do not exist. These tables are"
" needed for using minibatch during training.".
format(self.summary_table, self.std_table))
query = "SELECT * FROM {0}".format(self.summary_table)
summary_table_columns = plpy.execute(query)
if not summary_table_columns or len(summary_table_columns) == 0:
plpy.error("No columns in table {0}.".format(self.summary_table))
summary_table_columns = summary_table_columns[0]
required_columns = (self.DEPENDENT_VARNAME, self.INDEPENDENT_VARNAME,
if set(required_columns) <= set(summary_table_columns):
self.preprocessed_summary_dict = summary_table_columns
plpy.error("One or more expected columns {0} not present in"
" summary table {1}. These columns are"
" needed for using minibatch during training.".
format(required_columns, self.summary_table))