blob: 1532cb24ff4b386e42fc83d1f0d2fe5f0cfecfa5 [file] [log] [blame]
from __future__ import division, print_function
import plpy
from collections import defaultdict
from kernel_approximation import create_kernel, load_kernel
from utilities.control import MinWarning
from utilities.in_mem_group_control import GroupIterationController
from utilities.utilities import _assert
from utilities.utilities import _string_to_array
from utilities.utilities import _string_to_array_with_quotes
from utilities.utilities import add_postfix
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import get_grouping_col_str
from utilities.utilities import num_features, num_samples
from utilities.utilities import preprocess_keyvalue_params
from utilities.utilities import unique_string
from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import explicit_bool_to_text
from utilities.validate_args import get_expr_type
from utilities.validate_args import input_tbl_valid
from utilities.validate_args import is_var_valid
from utilities.validate_args import output_tbl_valid
from validation.internal.cross_validation import CrossValidator
def _compute_svm(args):
"""
Compute SVM coefficients
@return Number of iterations that has been run
"""
init_stepsize = args['init_stepsize']
args['stepsize'] = init_stepsize
iterationCtrl = GroupIterationController(args)
with iterationCtrl as it:
it.iteration = 0
has_converged = False
while not has_converged:
it.update(
"""
{schema_madlib}.linear_svm_igd_step(
({col_ind_var})::FLOAT8[],
({col_dep_var_trans})::FLOAT8,
{rel_state}.{col_grp_state},
{n_features}::INT4,
{stepsize}::FLOAT8,
{lambda}::FLOAT8,
{is_l2}::BOOLEAN,
{col_n_tuples},
({select_epsilon})::FLOAT8,
{is_svc}::BOOLEAN,
{class_weight_sql}::FLOAT8
)
""")
it.info()
if it.kwargs['decay_factor'] > 0:
it.kwargs['stepsize'] *= it.kwargs['decay_factor']
else:
it.kwargs['stepsize'] = init_stepsize / (it.iteration + 1)
has_converged = it.test(
"""
{iteration} >= {max_iter}
OR {schema_madlib}.internal_linear_svm_igd_distance(
_state_previous, _state_current) < {tolerance}
""")
it.final()
return iterationCtrl.iteration
# ------------------------------------------------------------------------------
def _verify_table(source_table, model_table, dependent_varname,
independent_varname, verify_dep=True, **kwargs):
# validate input
input_tbl_valid(source_table, 'SVM')
_assert(is_var_valid(source_table, independent_varname),
"SVM error: invalid independent_varname "
"('{independent_varname}') for source_table "
"({source_table})!".format(independent_varname=independent_varname,
source_table=source_table))
if verify_dep:
_assert(is_var_valid(source_table, dependent_varname),
"SVM error: invalid dependent_varname "
"('{dependent_varname}') for source_table "
"({source_table})!".format(dependent_varname=dependent_varname,
source_table=source_table))
dep_type = get_expr_type(dependent_varname, source_table)
if '[]' in dep_type:
plpy.error("SVM error: dependent_varname cannot be of array type!")
# validate output tables
output_tbl_valid(model_table, 'SVM')
summary_table = add_postfix(model_table, "_summary")
output_tbl_valid(summary_table, 'SVM')
# ------------------------------------------------------------------------------
def _verify_get_params_dict(params_dict):
_assert(not hasattr(params_dict['lambda'], '__len__'),
"SVM Error: lambda should not be a list after cross validation!")
_assert(not hasattr(params_dict['epsilon'], '__len__'),
"SVM Error: epsilon should not be a list after cross validation!")
_assert(not hasattr(params_dict['init_stepsize'], '__len__'),
"SVM Error: init_stepsize should not be a "
"list after cross validation!")
_assert(not hasattr(params_dict['decay_factor'], '__len__'),
"SVM Error: decay_factor should not be a "
"list after cross validation!")
_assert(not hasattr(params_dict['max_iter'], '__len__'),
"SVM Error: max_iter should not be a list after cross validation!")
return params_dict
# ------------------------------------------------------------------------------
def _build_output_tables(n_iters_run, args, **kwargs):
transformer = args['transformer']
use_transformer_for_output = args['use_transformer_for_output']
if use_transformer_for_output:
# transformer should always be a valid object created using the transform function.
ot = transformer.original_table
independent_varname = ot['independent_varname']
dependent_varname = ot['dependent_varname']
source_table = ot['source_table']
if not dependent_varname:
# an exception added for the svm_one_class where dependent_varname
# is artificially injected into the transformed table and does not
# exist in the original table. Hence we use transformed table
# to get the expression type
tt = transformer.transformed_table
dep_type = get_expr_type(tt['dependent_varname'], tt['source_table'])
else:
dep_type = get_expr_type(dependent_varname, source_table)
else:
source_table = args['source_table']
independent_varname = args['independent_varname']
dependent_varname = args['dependent_varname']
dep_type = get_expr_type(dependent_varname, source_table)
model_table = args['model_table']
random_table = add_postfix(model_table, "_random")
transformer.save_as(random_table)
kernel_func = transformer.kernel_func
kernel_params = transformer.kernel_params
grouping_col = args['grouping_col']
col_grp_key = args['col_grp_key']
if grouping_col:
groupby_str = "GROUP BY {0}, {1}".format(grouping_col, col_grp_key)
grouping_str1 = grouping_col + ","
using_str = "USING ({col_grp_key})".format(col_grp_key=col_grp_key)
else:
groupby_str, grouping_str1, using_str = "", "", "ON TRUE"
# organizing results
args.update(locals())
model_table_query = """
CREATE TABLE {model_table} AS
SELECT
{grouping_str1}
(result).coefficients AS coef,
(result).loss AS loss,
(result).norm_of_gradient AS norm_of_gradient,
{n_iters_run} AS num_iterations,
(result).num_rows_processed AS num_rows_processed,
n_tuples_including_nulls - (result).num_rows_processed
AS num_rows_skipped,
ARRAY[{mapping}]::{dep_type}[] AS dep_var_mapping
FROM
(
SELECT
{schema_madlib}.internal_linear_svm_igd_result(
{col_grp_state}
) AS result,
{col_grp_key}
FROM {rel_state}
WHERE {col_grp_iteration} = {n_iters_run}
) rel_state_subq
JOIN
(
SELECT
{grouping_str1}
count(*) AS n_tuples_including_nulls,
array_to_string(ARRAY[{grouping_str}],
','
) AS {col_grp_key}
FROM {source_table}
{groupby_str}
) n_tuples_including_nulls_subq
{using_str}
""".format(**args)
plpy.execute(model_table_query)
# summary table
n_failed_groups = plpy.execute("""
SELECT count(*) AS num_failed_groups
FROM {0}
WHERE coef IS NULL
""".format(model_table))[0]['num_failed_groups']
summary_table = add_postfix(model_table, "_summary")
grouping_text = "NULL" if not grouping_col else grouping_col
plpy.execute("""
CREATE TABLE {summary_table} AS
SELECT
'{method}'::text AS method,
'__MADLIB_VERSION__'::text AS version_number,
'{source_table}'::text AS source_table,
'{model_table}'::text AS model_table,
'{dependent_varname}'::text AS dependent_varname,
'{independent_varname}'::text AS independent_varname,
'{kernel_func}'::text AS kernel_func,
'{kernel_params}'::text AS kernel_params,
'{grouping_text}'::text AS grouping_col,
$$ init_stepsize={init_stepsize},
decay_factor={decay_factor},
max_iter={max_iter},
tolerance={tolerance},
epsilon={epsilon},
eps_table={eps_table},
class_weight={class_weight}
$$::text AS optim_params,
'lambda={lambda}, norm={norm}, n_folds={n_folds}'::text
AS reg_params,
count(*)::integer AS num_all_groups,
{n_failed_groups}::integer AS num_failed_groups,
sum(num_rows_processed)::bigint AS total_rows_processed,
sum(num_rows_skipped)::bigint AS total_rows_skipped
FROM {model_table};
""".format(summary_table=summary_table,
grouping_text=grouping_text,
n_failed_groups=n_failed_groups,
**args))
# ------------------------------------------------------------------------------
def svm_predict_help(schema_madlib, message, **kwargs):
args = dict(schema_madlib=schema_madlib)
summary = """
----------------------------------------------------------------
SUMMARY
----------------------------------------------------------------
Prediction for SVM can be used to obtain a prediction of both the
boolean and continuous value of the dependent variable given a
value of independent variable.
For more details on function usage:
SELECT {schema_madlib}.svm_predict('usage')
For a small example on using the function:
SELECT {schema_madlib}.svm_predict('example')
""".format(**args)
usage = """
---------------------------------------------------------------------------
PREDICTION
---------------------------------------------------------------------------
The prediction function is used to estimate the conditional mean given a
new predictor. It has the following syntax:
SELECT {schema_madlib}.svm_predict(
model_table, -- TEXT. Model table produced by
the training function.
new_data_table, -- TEXT. Name of the table containing the
prediction data. This table is expected to
contain the same features that were used during
training. The table should also contain
id_col_name used for identifying each row.
id_col_name, -- TEXT. The name of the id column in
the input table.
output_table -- TEXT. Name of the table where output
predictions are written. If this table name is
already in use, then an error is returned. The
table contains the id_col_name column giving
the 'id' for each prediction and the prediction
columns for the dependent variable.
);
""".format(**args)
example_usage = """
---------------------------------------------------------------------------
EXAMPLES
---------------------------------------------------------------------------
- Create an input data set.
CREATE TABLE houses (id INT, tax INT, bedroom INT, bath FLOAT, price INT,
size INT, lot INT);
COPY houses FROM STDIN WITH DELIMITER '|';
1 | 590 | 2 | 1 | 50000 | 770 | 22100
2 | 1050 | 3 | 2 | 85000 | 1410 | 12000
3 | 20 | 3 | 1 | 22500 | 1060 | 3500
4 | 870 | 2 | 2 | 90000 | 1300 | 17500
5 | 1320 | 3 | 2 | 133000 | 1500 | 30000
6 | 1350 | 2 | 1 | 90500 | 820 | 25700
7 | 2790 | 3 | 2.5 | 260000 | 2130 | 25000
8 | 680 | 2 | 1 | 142500 | 1170 | 22000
9 | 1840 | 3 | 2 | 160000 | 1500 | 19000
10 | 3680 | 4 | 2 | 240000 | 2790 | 20000
11 | 1660 | 3 | 1 | 87000 | 1030 | 17500
12 | 1620 | 3 | 2 | 118600 | 1250 | 20000
13 | 3100 | 3 | 2 | 140000 | 1760 | 38000
14 | 2070 | 2 | 3 | 148000 | 1550 | 14000
15 | 650 | 3 | 1.5 | 65000 | 1450 | 12000
\.
- Train a classification model, using a linear model.
SELECT {schema_madlib}.svm_classification('houses',
'houses_svm',
'price < 100000',
'ARRAY[1, tax, bath, size]');
- Generate a nonlinear model using a Gaussian kernel. This time we
specify the initial step size and maximum number of iterations to run.
As part of the kernel parameter, we choose 10 as the dimension of the
space where we train SVM. A larger number will lead to a more powerful
model but run the risk of overfitting. As a result, the model will be a
10 dimensional vector, instead of 4 as in the case of linear model.
SELECT {schema_madlib}.svm_classification( 'houses',
'houses_svm_gaussian',
'price < 100000',
'ARRAY[1, tax, bath, size]',
'gaussian',
'n_components=10',
'',
'init_stepsize=1, max_iter=200');
- Use the prediction function to evaluate the models. The predicted
results are in the prediction column and the actual data is in the
target column.
-- For the linear model:
SELECT {schema_madlib}.svm_predict('houses_svm',
'houses',
'id',
'houses_pred');
SELECT *, price < 100000 AS target
FROM houses JOIN houses_pred
USING (id) ORDER BY id;
-- For the Gaussian model:
SELECT {schema_madlib}.svm_predict('houses_svm_gaussian',
'houses',
'id',
'houses_pred_gaussian');
SELECT *, price < 100000 AS target
FROM houses JOIN houses_pred_gaussian
USING (id) ORDER BY id;
""".format(**args)
if not message:
return summary
elif message.lower() in ('usage', 'help', '?'):
return usage
elif message.lower() == 'example':
return example_usage
else:
return """
No such option. Use "SELECT {schema_madlib}.svm_predict()" for help.
""".format(**args)
# ------------------------------------------------------------------------------
def svm_one_class(schema_madlib, source_table, model_table, independent_varname,
kernel_func, kernel_params, grouping_col, params,
verbose, **kwargs):
""" Execute the support vector one-class classification algorithm.
The data in 'source_table' only contains independent variables. The algorithm
works by learning a classifier between these independent features
and the origin. The given data is treated as positive data and the origin
is treated as negative, with higher weight given to the origin to ensure
a balanced learning update.
"""
is_svc = True
dependent_varname = None
verbosity_level = "info" if verbose else "error"
with MinWarning(verbosity_level):
_verify_table(source_table, model_table,
dependent_varname, independent_varname, verify_dep=False)
reserved_cols =['coef', 'random_feature_data',
'random_feature_data', 'loss'
'num_rows_processed', 'num_rows_skipped',
'norm_of_gradient', 'num_iterations']
grouping_str, grouping_col = get_grouping_col_str(schema_madlib, 'SVM',
reserved_cols,
source_table,
grouping_col)
if not kernel_func:
kernel_func = 'gaussian'
else:
kernel_func = _get_kernel_name(kernel_func)
# _transform_w_kernel should always return a transformer. Since
# override_fit_intercept=True, it should always create a transformed_table
# containing a intercept along with any kernel transformation in the
# independent variable array
transformer = _transform_w_kernel(schema_madlib, source_table,
dependent_varname, independent_varname,
kernel_func, kernel_params,
grouping_col, override_fit_intercept=True)
source_table = transformer.transformed_table['source_table']
independent_varname = transformer.transformed_table['independent_varname']
dependent_varname = transformer.transformed_table['dependent_varname']
update_source_for_one_class = True
args = locals()
args.update(_extract_params(schema_madlib, params))
if not args['class_weight']:
args['class_weight'] = 'balanced'
_cross_validate_svm(args)
_svm_parsed_params(use_transformer_for_output=True, **args)
transformer.clear()
# ------------------------------------------------------------------------------
def get_svc_params_usage_string():
return """
---------------------------------------------------------------------------
OTHER PARAMETERS
---------------------------------------------------------------------------
Parameters are supplied in params argument as a string
containing a comma-delimited list of name-value pairs.
Hyperparameter optimization can be carried out through
the built-in cross validation mechanism
init_stepsize -- Default: [0.01]. Also known as the inital learning rate.
decay_factor -- Default: [0.9].
Control the learning rate schedule:
0 means constant rate; -1 means inverse scaling, i.e.,
stepsize = init_stepsize / iteration;
> 0 means exponential decay, i.e.,
stepsize = init_stepsize * decay_factor^iteration.
max_iter -- Default: [100].
The maximum number of iterations allowed.
tolerance -- Default: 1e-10. The criteria to end iterations.
lambda -- Default: [0.01]. Regularization parameter, positive.
norm -- Default: 'L2'.
Name of the regularization, either 'L2' or 'L1'.
epsilon -- Default: [0.01].
Determines the $\epsilon$ for $\epsilon$-regression.
Ignored during classification.
eps_tabl -- Default: NULL.
Name of the table that contains values of epsilon for
different groups. Ignored when grouping_col is NULL.
validation_result -- Default: NULL.
Name of the table to store the cross validation results
including the values of parameters and
their averaged error values.
n_folds -- Default: 0. Number of folds.
Must be at least 2 to activate cross validation.
"""
# ------------------------------------------------------------------------------
def get_svc_gaussian_usage_string():
return """
---------------------------------------------------------------------------
GAUSSIAN PARAMETERS
---------------------------------------------------------------------------
Parameters are supplied in kernel_params argument as a string
containing a comma-delimited list of name-value pairs.
gamma -- Default: 1/num_features.
The parameter $\gamma$ in the Radius Basis
Function kernel,
n_components -- Default: 2*num_features.
The dimensionality of the transformed feature space.
random_state -- Default: 1. Seed used by the random number generator.
"""
# ------------------------------------------------------------------------------
def get_svc_poly_usage_string():
return """
---------------------------------------------------------------------------
POLYNOMIAL PARAMETERS
---------------------------------------------------------------------------
Parameters are supplied in kernel_params argument as a string
containing a comma-delimited list of name-value pairs.
coef0 -- Default: 1.0.
The independent term q in (xTy + q)^r.
Must be larger or equal to 0. When it is 0,
the polynomial kernel is in homogeneous form.
degree -- Default: 3.
The parameter r in (xTy + q)^r.
n_components -- Default: 2*num_features.
The dimensionality of the transformed feature space.
A larger value lowers the variance of the estimate of
kernel but requires more memory and
takes longer to train.
random_state -- Default: 1. Seed used by the random number generator.
"""
def svm_one_class_help(schema_madlib, message, is_svc, **kwargs):
method = 'svm_one_class'
args = dict(schema_madlib=schema_madlib, method=method)
summary = """
----------------------------------------------------------------
SUMMARY
----------------------------------------------------------------
Support Vector Machines (SVMs) are models for regression
and classification tasks.
SVM models have two particularly desirable features:
robustness in the presence of noisy data and applicability
to a variety of data configurations.
For more details on function usage:
SELECT {schema_madlib}.{method}('usage')
""".format(**args)
usage = """
---------------------------------------------------------------------------
USAGE
---------------------------------------------------------------------------
SELECT {schema_madlib}.{method}(
source_table, -- name of input table
model_table, -- name of output model table
independent_varname, -- names of independent variables
kernel_func, -- optional, default: 'linear'.
supported type of kernel: 'linear', 'gaussian',
and 'polynomial'
kernel_params, -- optional, default: NULL
parameters for non-linear kernel in a
comma-separated string of key-value pairs. The
parameters differ depending on the value of
kernel_func.
to find out more:
SELECT {schema_madlib}.{method}('kernel_func')
where replace 'kernel_func' with whatever kernel
you are interested in, i.e.,
SELECT {schema_madlib}.{method}('gaussian')
grouping_cols, -- optional, default NULL
names of columns to group-by
params, -- optional, default NULL
parameters for optimization and regularization in
a comma-separated string of key-value pairs. If a
list of values are provided, then cross-
validation will be performed to select the best
value from the list.
to find out more:
SELECT {schema_madlib}.{method}('params')
verbose -- optional, default FALSE
whether to print useful info
);
---------------------------------------------------------------------------
OUTPUT
---------------------------------------------------------------------------
The model table produced by svm contains the following columns:
coef FLOAT8, -- vector of the coefficients.
grouping_key TEXT, -- identifies the group to which
the datum belongs.
num_rows_processed BIGINT, -- numbers of rows processed.
num_rows_skipped BIGINT, -- numbers of rows skipped due
to missing values or failures.
num_iterations INTEGER, -- number of iterations completed by
the optimization algorithm.
The algorithm either converged in this
number of iterations or hit the maximum
number specified in the
optimization parameters.
loss FLOAT8, -- value of the objective function of
SVM. See Technical Background section
below for more details.
norm_of_gradient FLOAT8, -- value of the L2-norm of the
(sub)-gradient of the objective
function.
__dep_var_mapping TEXT[], -- vector of dependendent variable labels.
The first entry will correspond to -1
and the second to +1, for internal use.
Since the input table does not have an
dependendent variable, a new column is
created while learning the one-class SVM
model.
An auxiliary table named <model_table>_random is created if the kernel is not
linear. It contains data needed to embed test data into random feature space
(see reference [2,3]). This data is used internally by svm_predict and not
meaningful on its own.
A summary table named <model_table>_summary is also created at the same time,
which has the following columns:
method varchar, -- 'svm'
version_number varchar, -- version of madlib which was used to
generate the model.
source_table varchar, -- the data source table name.
model_table varchar, -- the model table name.
dependent_varname varchar, -- the dependent variable, created automatically.
independent_varname varchar, -- the independent variables.
kernel_func varchar, -- the kernel function.
kernel_parameters varchar, -- the kernel parameters.
grouping_col varchar, -- columns on which to group.
optim_params varchar, -- a string containing the
optimization parameters.
reg_params varchar, -- a string containing the
regularization parameters.
num_all_groups integer, -- number of groups in glm training.
num_failed_groups integer, -- number of failed groups in glm training.
total_rows_processed integer, -- total numbers of rows processed
in all groups.
total_rows_skipped integer, -- numbers of rows skipped in all groups
due to missing values or failures.
""".format(**args)
params_usage = get_svc_params_usage_string()
gaussian_usage = get_svc_gaussian_usage_string()
poly_usage = get_svc_poly_usage_string()
if not message:
return summary
elif message.lower() in ('usage', 'help', '?'):
return usage
elif message.lower() == 'params':
return params_usage
elif message.lower() == 'gaussian':
return gaussian_usage
elif message.lower() == 'polynomial':
return poly_usage
else:
return """
No such option. Use "SELECT {schema_madlib}.{method}()" for help.
""".format(**args)
# ------------------------------------------------------------------------------
def svm_help(schema_madlib, message, is_svc, **kwargs):
method = 'svm_classification' if is_svc else 'svm_regression'
args = dict(schema_madlib=schema_madlib, method=method)
summary = """
----------------------------------------------------------------
SUMMARY
----------------------------------------------------------------
Support Vector Machines (SVMs) are models for regression
and classification tasks.
SVM models have two particularly desirable features:
robustness in the presence of noisy data and applicability
to a variety of data configurations.
For more details on function usage:
SELECT {schema_madlib}.{method}('usage')
For a small example on using the function:
SELECT {schema_madlib}.{method}('example')
""".format(**args)
usage = """
---------------------------------------------------------------------------
USAGE
---------------------------------------------------------------------------
SELECT {schema_madlib}.{method}(
source_table, -- name of input table
model_table, -- name of output model table
dependent_varname, -- name of dependent variable
independent_varname, -- names of independent variables
kernel_func, -- optional, default: 'linear'.
supported type of kernel: 'linear', 'gaussian',
and 'polynomial'
kernel_params, -- optional, default: NULL
parameters for non-linear kernel in a
comma-separated string of key-value pairs. The
parameters differ depending on the value of
kernel_func.
to find out more:
SELECT {schema_madlib}.{method}('kernel_func')
where replace 'kernel_func' with whatever kernel
you are interested in, i.e.,
SELECT {schema_madlib}.{method}('gaussian')
grouping_cols, -- optional, default NULL
names of columns to group-by
params, -- optional, default NULL
parameters for optimization and regularization in
a comma-separated string of key-value pairs. If a
list of values are provided, then cross-
validation will be performed to select the best
value from the list.
to find out more:
SELECT {schema_madlib}.{method}('params')
verbose -- optional, default FALSE
whether to print useful info
);
---------------------------------------------------------------------------
OUTPUT
---------------------------------------------------------------------------
The model table produced by svm contains the following columns:
coef FLOAT8, -- vector of the coefficients.
grouping_key TEXT, -- identifies the group to which
the datum belongs.
num_rows_processed BIGINT, -- numbers of rows processed.
num_rows_skipped BIGINT, -- numbers of rows skipped due
to missing values or failures.
num_iterations INTEGER, -- number of iterations completed by
the optimization algorithm.
The algorithm either converged in this
number of iterations or hit the maximum
number specified in the
optimization parameters.
loss FLOAT8, -- value of the objective function of
SVM. See Technical Background section
below for more details.
norm_of_gradient FLOAT8, -- value of the L2-norm of the
(sub)-gradient of the objective
function.
__dep_var_mapping TEXT[], -- vector of dependendent variable labels.
The first entry will correspond to -1
and the second to +1, for internal use.
An auxiliary table named <model_table>_random is created if the kernel is not
linear. It contains data needed to embed test data into random feature space
(see reference [2,3]). This data is used internally by svm_predict and not
meaningful on its own.
A summary table named <model_table>_summary is also created at the same time,
which has the following columns:
method varchar, -- 'svm'
version_number varchar, -- version of madlib which was used to
generate the model.
source_table varchar, -- the data source table name.
model_table varchar, -- the model table name.
dependent_varname varchar, -- the dependent variable.
independent_varname varchar, -- the independent variables.
kernel_func varchar, -- the kernel function.
kernel_parameters varchar, -- the kernel parameters.
grouping_col varchar, -- columns on which to group.
optim_params varchar, -- a string containing the
optimization parameters.
reg_params varchar, -- a string containing the
regularization parameters.
num_all_groups integer, -- number of groups in glm training.
num_failed_groups integer, -- number of failed groups in glm training.
total_rows_processed integer, -- total numbers of rows processed
in all groups.
total_rows_skipped integer, -- numbers of rows skipped in all groups
due to missing values or failures.
""".format(**args)
params_usage = get_svc_params_usage_string()
gaussian_usage = get_svc_gaussian_usage_string()
poly_usage = get_svc_poly_usage_string()
if not message:
return summary
elif message.lower() in ('usage', 'help', '?'):
return usage
elif message.lower() == 'params':
return params_usage
elif message.lower() == 'gaussian':
return gaussian_usage
elif message.lower() == 'polynomial':
return poly_usage
else:
return """
No such option. Use "SELECT {schema_madlib}.{method}()" for help.
""".format(**args)
# ------------------------------------------------------------------------------
def svm(schema_madlib, source_table, model_table,
dependent_varname, independent_varname, kernel_func,
kernel_params, grouping_col, params, is_svc,
verbose, **kwargs):
"""
Executes the linear support vector classification algorithm.
"""
# verbosing
verbosity_level = "warning" if verbose else "error"
with MinWarning(verbosity_level):
_verify_table(source_table, model_table,
dependent_varname, independent_varname)
reserved_cols =['coef', 'random_feature_data',
'random_feature_data', 'loss'
'num_rows_processed', 'num_rows_skipped',
'norm_of_gradient', 'num_iterations']
grouping_str, grouping_col = \
get_grouping_col_str(schema_madlib, 'SVM', reserved_cols,
source_table, grouping_col)
kernel_func = _get_kernel_name(kernel_func)
transformer = _transform_w_kernel(schema_madlib, source_table,
dependent_varname, independent_varname,
kernel_func, kernel_params,
grouping_col)
args = locals()
args.update(_extract_params(schema_madlib, params))
if transformer.transformed_table:
args.update(transformer.transformed_table)
_cross_validate_svm(args)
_svm_parsed_params(use_transformer_for_output=True, **args)
transformer.clear()
# ------------------------------------------------------------------------------
def _cross_validate_svm(args):
# updating params_dict will also update args['params_dict']
params_dict = args
if params_dict['n_folds'] > 1 and args['grouping_col']:
plpy.error('SVM Error: cross validation '
'with grouping is not supported!')
cv_params = {}
if len(params_dict['lambda']) > 1:
cv_params['lambda'] = params_dict['lambda']
else:
params_dict['lambda'] = params_dict['lambda'][0]
if len(params_dict['epsilon']) > 1 and not args['is_svc']:
cv_params['epsilon'] = params_dict['epsilon']
else:
params_dict['epsilon'] = params_dict['epsilon'][0]
if len(params_dict['init_stepsize']) > 1:
cv_params['init_stepsize'] = params_dict['init_stepsize']
else:
params_dict['init_stepsize'] = params_dict['init_stepsize'][0]
if len(params_dict['max_iter']) > 1:
cv_params['max_iter'] = params_dict['max_iter']
else:
params_dict['max_iter'] = params_dict['max_iter'][0]
if len(params_dict['decay_factor']) > 1:
cv_params['decay_factor'] = params_dict['decay_factor']
else:
params_dict['decay_factor'] = params_dict['decay_factor'][0]
if not cv_params and params_dict['n_folds'] <= 1:
# no cross validation
return
if cv_params and params_dict['n_folds'] <= 1:
plpy.error("SVM Error: All parameters must be scalar "
"or of length 1 when n_folds is 0 or 1")
if not cv_params and params_dict['n_folds'] > 1:
plpy.warning('SVM Warning: n_folds > 1 but no cross validate params provided'
'Ignoring cross validation request.')
return
scorer = 'classification' if args['is_svc'] else 'regression'
# svm in cross validation should not transform the data,
# since test data in cross validation comes from the transformed source table.
# A linear transformer without intercept is a no-op transformer.
no_op_kernel = create_kernel(args['schema_madlib'], 0,
'linear', {'fit_intercept': False})
no_op_transformer = no_op_kernel.transform(args['source_table'],
args['independent_varname'],
args['dependent_varname'])
transformer = args.get('transformer', no_op_transformer)
args.update(dict(transformer=no_op_transformer))
cv = CrossValidator(_svm_parsed_params, svm_predict, scorer, args)
val_res = cv.validate(cv_params, params_dict['n_folds'])
val_res.output_tbl(params_dict['validation_result'])
params_dict.update(val_res.top('sub_args'))
args.update(dict(transformer=transformer))
# ------------------------------------------------------------------------------
def _get_kernel_name(kernel_func):
if not kernel_func:
kernel_func = 'linear'
else:
# Add non-linear kernels below after implementing them.
supported_kernels = ['linear', 'gaussian', 'polynomial']
try:
# allow user to specify a prefix substring of
# supported kernels. This works because the supported
# kernels have unique prefixes.
kernel_func = next(x for x in supported_kernels
if x.startswith(kernel_func))
except StopIteration:
# next() returns a StopIteration if no element found
plpy.error("SVM Error: Invalid kernel function: "
"{0}. Supported kernel functions are ({1})"
.format(kernel_func, ','.join(sorted(supported_kernels))))
return kernel_func
# ------------------------------------------------------------------------------
def _transform_w_kernel(schema_madlib, source_table, dependent_varname,
independent_varname, kernel_func,
kernel_params, grouping_col, override_fit_intercept=False):
""" Transform source table with a kernel function and return the transfomer.
Args:
@param schema_madlib: str, Name of the MADlib schema
@param source_table: str, Name of the table with input data
@param dependent_varname: str, Name of the column containing response variable
@param independent_varname: str, Name of the column containing feature variables
@param kernel_func: str, Name of the kernel to apply
@param kernel_params: str, Key-value set of parameters for the kernel class
@param grouping_col: str, Comma-separated list of grouping column names
@param override_fit_intercept: bool, If True, the fit_intercept parameter
in kernel_params is always set to True
independent of user input. No-op if
this is False.
"""
n_features = num_features(source_table, independent_varname)
kernel_params_dict = _extract_kernel_params(kernel_params, n_features)
if override_fit_intercept:
kernel_params_dict['fit_intercept'] = True
transformer = create_kernel(schema_madlib, n_features,
kernel_func, kernel_params_dict)
return (transformer.fit(n_features).
transform(source_table, independent_varname,
dependent_varname, grouping_col))
# ------------------------------------------------------------------------------
def _compute_class_weight_sql(source_table, dependent_varname,
is_svc, class_weight_str):
"""
Args:
@param is_svc: Boolean, indicates if classification or regression
Returns:
str. String when executed in SQL computes the class weight for each tuple
"""
if not is_svc or not class_weight_str:
return "1"
dep_to_weight = defaultdict(float)
class_weight_str = class_weight_str.strip()
if class_weight_str == "balanced":
# use half of n_samples since only doing binary classification
# Change the '2' to n_classes for multinomial
n_samples_per_class = num_samples(source_table) / 2
bin_count = plpy.execute("""SELECT {dep} as k, count(*) as v
FROM {src}
GROUP BY {dep}
""".format(dep=dependent_varname,
src=source_table))
for each_count in bin_count:
dep_to_weight[each_count['k']] = n_samples_per_class / each_count['v']
elif _is_class_weights_str_a_mapping(class_weight_str):
# preprocess_keyvalue_params() does not seem to handle special
# chars as expected. TODO: Fix it in MADLIB-1354.
class_weight_splits = preprocess_keyvalue_params(
class_weight_str, split_char=':')
_assert(class_weight_splits and len(class_weight_splits)<=2,
"SVM: Only binary classification is supported. The "
"class_weight param should have at least one and at most "
"two labels in it.")
# Cast the distinct class values' array to a text array since a
# numeric class will show up with suffix 'L' sometimes, and that
# may cause issues when we try to check if a class level specified
# in class_weight (a string) exists in the distinct class levels
# or not.
distinct_class_levels = plpy.execute("""
SELECT array_agg(DISTINCT({0}))::TEXT[] AS labels
FROM {1}
""".format(dependent_varname, source_table))[0]['labels']
for each_pair in class_weight_splits:
k, v = each_pair.split(":")
_assert(k in distinct_class_levels,
"SVM: Key '{0}' in '{1}' is not a valid class label.".
format(k, class_weight_str))
try:
dep_to_weight[k.strip()] = float(v.strip())
except ValueError:
plpy.error("SVM: Weights for a class label must be numeric."
" Invalid class_weights param ({0})".format(
class_weight_str))
else:
plpy.error("SVM: Invalid class_weight param ({0})".format(
class_weight_str))
class_weight_sql = "CASE "
for k, v in dep_to_weight.items():
class_weight_sql += ("WHEN {dep}=$madlib${k}$madlib$ THEN {v}::FLOAT8 \n".
format(dep=dependent_varname, k=k, v=v))
class_weight_sql += "ELSE 1.0 END"
return class_weight_sql
# -------------------------------------------------------------------------
def _is_class_weights_str_a_mapping(class_weight_str):
"""
Check if the class_weight_str begins with a '{' and ends with a '}'
"""
return len(class_weight_str)>2 and class_weight_str[0]=='{' and \
class_weight_str[-1]=='}'
def _svm_parsed_params(schema_madlib, source_table, model_table,
dependent_varname, independent_varname,
transformer, grouping_str,
grouping_col, is_svc,
use_transformer_for_output=False,
update_source_for_one_class=False,
verbose=False, **kwargs):
"""
Executes the linear support vector algorithm.
Args:
@param use_transformer_for_output: bool,
This variable decides if the output tables are created using either
the 'args' supplied in this function or the 'original_table'
structure in the transformer. This is necessary to allow creating
temporary output tables from cross validation which are different
from the 'original_table' used in the transformer.
@param update_source_for_one_class: bool,
This is a special indicator added here for svm_one_class. This has
to be placed here instead of the svm_one_class function so that
cross validation undergoes the same transformation for its split
datasets.
"""
n_features = num_features(source_table, independent_varname)
if update_source_for_one_class:
# This block is run only when the caller is svm_one_class
# Create a temporary relation with a dependent variable and insert
# the origin into kernel space. Kernel adds an intercept at the end of the
# independent_varname. Here an origin is added to the source table, with
# the final value set to 1.
dependent_varname = unique_string(desp='dep_var')
source_w_origin = unique_string(desp='src_tbl')
plpy.execute("""
CREATE TEMP VIEW {source_w_origin} AS
SELECT {independent_varname},
1.0 AS {dependent_varname}
FROM {source_table}
UNION
SELECT
array_append(
{schema_madlib}.array_fill(
{schema_madlib}.array_of_float({n_features} - 1),
0::float)::float[],
1::float
) as {independent_varname},
-1::float as {dependent_varname}
""".format(**locals()))
source_table = source_w_origin
if transformer.transformed_table:
transformer.transformed_table.update(
dict(source_table=source_w_origin,
dependent_varname=dependent_varname))
# args.update(transformer.transformed_table)
class_weight_sql = _compute_class_weight_sql(source_table,
dependent_varname,
is_svc,
kwargs['class_weight'])
args = locals()
args.update({
'rel_args': unique_string(desp='rel_args'),
'rel_state': unique_string(desp='rel_state'),
'col_grp_iteration': unique_string(desp='col_grp_iteration'),
'col_grp_state': unique_string(desp='col_grp_state'),
'col_grp_key': unique_string(desp='col_grp_key'),
'col_n_tuples': unique_string(desp='col_n_tuples'),
'state_type': "double precision[]",
'rel_source': args['source_table'],
'col_ind_var': args['independent_varname'],
'col_dep_var': args['dependent_varname'],
})
args.update(_verify_get_params_dict(kwargs))
args.update(_process_epsilon(is_svc, args))
args.update(_svc_or_svr(is_svc, source_table, dependent_varname))
# place holder for compatibility
plpy.execute("DROP TABLE IF EXISTS {0}".format(args['rel_args']))
plpy.execute("CREATE TABLE pg_temp.{0} AS SELECT 1".format(args['rel_args']))
# actual iterative algorithm computation
n_iters_run = _compute_svm(args)
_build_output_tables(n_iters_run, args, **kwargs)
# -----------------------------------------------------------------------------
def svm_predict(schema_madlib, model_table, new_data_table, id_col_name,
output_table, **kwargs):
""" Score data points stored in a table using a learned support vector model.
@param model_table Name of learned model
@param new_data_table Name of table/view containing the data
points to be scored
@param id_col_name Name of column in source_table containing
(integer) identifier for data point
@param output_table Name of table to store the results
"""
with MinWarning("warning"):
# model table
input_tbl_valid(model_table, 'SVM')
cols_in_tbl_valid(model_table, ['coef'], 'SVM')
# summary table
summary_table = add_postfix(model_table, "_summary")
input_tbl_valid(summary_table, 'SVM')
cols_in_tbl_valid(summary_table,
['dependent_varname', 'independent_varname',
'kernel_func', 'kernel_params', 'grouping_col'],
'SVM')
# read necessary info from summary
summary = plpy.execute("""
SELECT
method, dependent_varname, independent_varname,
kernel_func, kernel_params, grouping_col
FROM {summary_table}
""".format(**locals()))[0]
method = summary['method']
dependent_varname = summary['dependent_varname']
independent_varname = summary['independent_varname']
kernel_func = summary['kernel_func']
kernel_params = summary['kernel_params']
grouping_col = summary['grouping_col']
grouping_col = None if grouping_col == 'NULL' else grouping_col
input_tbl_valid(new_data_table, 'SVM')
reserved_cols =['coef', 'random_feature_data',
'random_feature_data', 'loss'
'num_rows_processed', 'num_rows_skipped',
'norm_of_gradient', 'num_iterations']
grouping_str, grouping_col = get_grouping_col_str(
schema_madlib, 'SVM', reserved_cols,
new_data_table, grouping_col)
_assert(is_var_valid(new_data_table, independent_varname),
"SVM Error: independent_varname ('" + independent_varname +
"') is invalid for new_data_table (" + new_data_table + ")!")
_assert(id_col_name is not None, "SVM Error: id_col_name is NULL!")
_assert(is_var_valid(new_data_table, id_col_name),
"SVM Error: id_col_name ('" + id_col_name +
"') is invalid for new_data_table (" + new_data_table + ")!")
output_tbl_valid(output_table, 'SVM')
kernel_params_dict = _extract_kernel_params(kernel_params)
random_table = add_postfix(model_table, '_random')
if kernel_func.lower() != 'linear':
# random table is not created with the linear kernel and ignored
# in the load_kernel call, hence we disable the check for 'linear'
input_tbl_valid(random_table, 'SVM')
transformer = load_kernel(schema_madlib, random_table,
kernel_func, kernel_params_dict)
transformer.transform(new_data_table, independent_varname,
grouping_col=grouping_col, id_col=id_col_name)
if transformer.transformed_table:
data_rel_info = transformer.transformed_table
else:
data_rel_info = transformer.original_table
new_data_table = data_rel_info['source_table']
independent_varname = data_rel_info['independent_varname']
dependent_varname = data_rel_info['dependent_varname']
pred_dist = """{0}.array_dot(coef::double precision [],
{1}::double precision [])
""".format(schema_madlib, independent_varname)
if method.upper() == 'SVC':
pred_query = """
CASE WHEN {schema_madlib}.array_dot(
coef::double precision [],
{independent_varname}::double precision []
) >= 0
THEN dep_var_mapping[2]
ELSE dep_var_mapping[1]
END
""".format(schema_madlib=schema_madlib,
independent_varname=independent_varname)
elif method.upper() == 'SVR':
pred_query = pred_dist
else:
plpy.error("SVM Error: Invalid 'method' value in summary table. "
"'method' can only be SVC or SVR!")
if grouping_col:
sql = """
CREATE TABLE {output_table} AS
SELECT
{id_col_name} AS {id_col_name},
{pred_query} AS prediction,
{pred_dist} AS decision_function,
ARRAY[{grouping_str}] as grouping_col,
{grouping_col}
FROM {model_table}
JOIN {new_data_table}
USING ({grouping_col})
WHERE not {schema_madlib}.array_contains_null({independent_varname})
ORDER BY grouping_col, {id_col_name}
""".format(**locals())
else:
sql = """
CREATE TABLE {output_table} AS
SELECT
{id_col_name} AS {id_col_name},
{pred_query} as prediction,
{pred_dist} AS decision_function
FROM
{model_table},
{new_data_table}
WHERE
not {schema_madlib}.array_contains_null({independent_varname})
""".format(**locals())
plpy.execute(sql)
transformer.clear()
# -----------------------------------------------------------------------------
def _svc_or_svr(is_svc, source_table, dependent_varname):
# transform col_dep_var to binary (1`or -1) if classification
_args = {'col_dep_var_trans': dependent_varname,
'mapping': 'NULL',
'method': 'SVR'}
if is_svc:
# dependent variable mapping
dep_labels = plpy.execute("""
SELECT {dependent_varname} AS y
FROM {source_table}
WHERE ({dependent_varname}) IS NOT NULL
GROUP BY ({dependent_varname})
ORDER BY ({dependent_varname})
""".format(source_table=source_table,
dependent_varname=dependent_varname))
dep_var_mapping = ["'{0}'".format(d['y'])
if isinstance(d['y'], basestring)
else str(d['y']) for d in dep_labels]
_assert(1 <= len(dep_var_mapping) <= 2,
"SVM Error: Classification currently "
"only supports unary or binary output!. Found values {0}".
format(dep_var_mapping))
col_dep_var_trans = ("""
CASE WHEN ({col_dep_var}) IS NULL THEN NULL
WHEN ({col_dep_var}) = {mapped_value_for_negative} THEN -1.0
ELSE 1.0
END
""".format(col_dep_var=dependent_varname,
mapped_value_for_negative=dep_var_mapping[0]))
_args.update({
'mapped_value_for_negative': dep_var_mapping[0],
'col_dep_var_trans': col_dep_var_trans,
'mapping': dep_var_mapping[0] + "," + dep_var_mapping[1],
'method': 'SVC'})
return _args
# -----------------------------------------------------------------------------
def _process_epsilon(is_svc, args):
eps_table = args['eps_table']
grouping_col = args['grouping_col']
grouping_str = args['grouping_str']
col_grp_key = args['col_grp_key']
rel_source = args['rel_source']
epsilon = args['epsilon']
rel_epsilon = ''
select_epsilon = '{0}'.format(epsilon)
as_rel_source = '_src'
if not is_svc and grouping_col and eps_table:
rel_epsilon = unique_string(desp='rel_epsilon')
input_tbl_valid(eps_table, 'SVM')
_assert(is_var_valid(eps_table, grouping_col),
"SVM Error: invalid column names ('{grouping_col}') "
"for eps_table ('{eps_table}')!"
.format(grouping_col=grouping_col,
eps_table=eps_table))
plpy.execute("""
DROP TABLE IF EXISTS {rel_epsilon};
CREATE TEMPORARY TABLE {rel_epsilon} AS (
SELECT
{col_grp_key},
coalesce(epsilon, {epsilon}) AS epsilon
FROM (
SELECT
array_to_string(ARRAY[{grouping_str}], ',') AS
{col_grp_key}
FROM
{rel_source}
GROUP BY {grouping_col}
) q1
LEFT JOIN
(
SELECT
array_to_string(ARRAY[{grouping_str}], ',') AS
{col_grp_key},
epsilon
FROM {eps_table}
) q2
USING ({col_grp_key})
);
""".format(**locals()))
select_epsilon = (
"""
(
SELECT epsilon
FROM
{rel_epsilon}
WHERE
{rel_epsilon}.{col_grp_key} = {as_rel_source}.{col_grp_key}
)
"""
.format(**locals()))
return {'select_epsilon': select_epsilon,
'epsilon': epsilon,
'rel_epsilon': rel_epsilon,
'as_rel_source': as_rel_source}
# -----------------------------------------------------------------------------
def _extract_kernel_params(kernel_params='', n_features=10):
params_default = {
# common params
'n_components': max(100, 2 * n_features),
'fit_intercept': False,
'random_state': 1,
# polynomial params
'degree': 3,
'coef0': 1,
# gaussian params
'fit_in_memory': True,
'gamma': 1 / n_features,
}
params_types = {
'n_components': int,
'fit_intercept': bool,
'random_state': int,
'degree': int,
'coef0': float,
'fit_in_memory': bool,
'gamma': float,
}
return extract_keyvalue_params(kernel_params, params_types, params_default)
# -----------------------------------------------------------------------------
def _extract_params(schema_madlib, params, module='SVM'):
# NOTICE: the type of values in params_default should be consistent with
# the types specified in params_types
params_default = {
'init_stepsize': [0.01],
'decay_factor': [0.9],
'max_iter': [100],
'tolerance': 1e-10,
'lambda': [0.01],
'norm': 'L2',
'n_folds': 0,
'validation_result': '',
'epsilon': [0.01],
'eps_table': '',
'class_weight': ''}
params_types = {
'init_stepsize': list,
'decay_factor': list,
'max_iter': list,
'tolerance': float,
'lambda': list,
'norm': str,
'n_folds': int,
'validation_result': str,
'epsilon': list,
'eps_table': str,
'class_weight': str}
params_vals = extract_keyvalue_params(params, params_types, params_default)
if params_vals['n_folds'] < 0:
plpy.error("{0} Error: n_folds must be non-negative!".format(module))
# validate lambda
params_vals['lambda'] = map(float, params_vals['lambda'])
_assert(all(lmd >= 0 for lmd in params_vals['lambda']),
"{0} Error: lambda must be non-negative!".format(module))
# validate epsilon
params_vals['epsilon'] = map(float, params_vals['epsilon'])
_assert(all(e >= 0 for e in params_vals['epsilon']),
"{0} Error: epsilon must be non-negative!".format(module))
# validating cross validation is delegated to _cross_validate_svm()
params_vals['init_stepsize'] = map(float, params_vals['init_stepsize'])
_assert(all(e > 0 for e in params_vals['init_stepsize']),
"{0} Error: init_stepsize must be positive!".format(module))
params_vals['max_iter'] = map(int, params_vals['max_iter'])
_assert(all(e > 0 for e in params_vals['max_iter']),
"{0} Error: max_iter must be positive!".format(module))
params_vals['decay_factor'] = map(float, params_vals['decay_factor'])
_assert(all(e <= 1 for e in params_vals['decay_factor']),
"{0} Error: decay_factor must be <= 1!".format(module))
if params_vals['validation_result']:
output_tbl_valid(params_vals['validation_result'], 'SVM')
params_vals['norm'] = params_vals['norm'].lower()
_assert(params_vals['norm'] == 'l1' or params_vals['norm'] == 'l2',
"{0} Error: norm must be either L1 or L2!".format(module))
_assert(params_vals['tolerance'] >= 0,
"{0} error: tolerance must be non-negative!".format(module))
params_vals['is_l2'] = True if params_vals['norm'] == 'l2' else False
return params_vals
# -------------------------------------------------------------------------
import unittest
class SVMTestCase(unittest.TestCase):
"""
Comment "import plpy" and replace plpy.error calls with appropriate
Python Exceptions to successfully run the test cases
"""
def setUp(self):
self.optimizer_params1 = 'max_iter=10, optimizer="irls", precision=1e-4'
self.optimizer_params2 = 'max_iter=2.01, optimizer=newton-irls, precision=1e-5'
self.optimizer_params3 = 'max_iter=10, 10, optimizer=, lambda={1,2,3,4}'
self.optimizer_params4 = ('max_iter=10, optimizer="irls",'
'precision=0.02.01, lambda={1,2,3,4}')
self.optimizer_types = {'max_iter': int, 'optimizer': str,
'lambda': list, 'precision': float}
def test_preprocess_optimizer(self):
self.assertEqual(preprocess_keyvalue_params(self.optimizer_params1),
['max_iter=10', 'optimizer="irls"', 'precision=1e-4'])
self.assertEqual(preprocess_keyvalue_params(self.optimizer_params2),
['max_iter=2.01', 'optimizer=newton-irls', 'precision=1e-5'])
self.assertEqual(preprocess_keyvalue_params(self.optimizer_params3),
['max_iter=10', 'lambda={1,2,3,4}'])
self.assertEqual(preprocess_keyvalue_params(self.optimizer_params4),
['max_iter=10', 'optimizer="irls"', 'precision=0.02', 'lambda={1,2,3,4}'])
if __name__ == '__main__':
unittest.main()