blob: 413dd2a591e190de922177a3ab9171ad8f439830 [file] [log] [blame]
#!/usr/bin/env python
import plpy
from utilities.validate_args import table_exists
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_is_empty
from utilities.validate_args import cols_in_tbl_valid
from utilities.validate_args import input_tbl_valid
from utilities.validate_args import output_tbl_valid
from utilities.validate_args import regproc_valid
def __svm_validate_parameters(madlib_schema, input_table, model_table, parallel,
kernel_func, verbose, eta, nu, slambda = 0.2, check_label=True):
"""
validate arguments
@param input_table Name of table/view containing the training data
@param model_table Name of table under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models
in parallel
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training
data that will become support vectors (default value is 0.005)
@param slambda Regularisation parameter (default value is 0.2)
"""
input_tbl_valid(input_table, 'SVM')
if check_label:
cols_in_tbl_valid(input_table, ['id', 'ind', 'label'], 'SVM')
else:
cols_in_tbl_valid(input_table, ['id', 'ind'], 'SVM')
output_tbl_valid(model_table, 'SVM')
if parallel is None:
plpy.error("SVM error: Argument 'parallel' should not be NULL!")
if kernel_func is None or kernel_func == '':
plpy.error("SVM error: Invalid kernel function is provided!")
regproc_valid(kernel_func, "float8[], float8[], float8", 'SVM')
if eta <= 0 or eta > 1:
plpy.error("SVM error: Learning rate eta should in range (0,1].")
if nu <= 0 or nu > 1:
plpy.error("SVM error: Compression parameter nu should in range (0,1].")
def __svm_predict_validate_parameters(input_table, data_col, id_col, model_table, output_table, parallel):
"""
@param input_table Name of table/view containing the data points to be scored
@param data_col Name of column in input_table containing the data points
@param id_col Name of column in input_table containing (integer) identifier for data point
@param model_table Name of learned model
@param output_table Name of table to store the results
@param parallel A flag indicating whether the system should learn multiple models in parallel
"""
input_tbl_valid(input_table, 'SVM')
cols_in_tbl_valid(input_table, [data_col, id_col], 'SVM')
input_tbl_valid(model_table, 'SVM')
output_tbl_valid(output_table, 'SVM')
if parallel is None:
plpy.error("SVM error: Argument 'parallel' should not be NULL!")
# -----------------------------------------------
# Function to run the regression algorithm
# -----------------------------------------------
def svm_regression(madlib_schema, input_table, model_table, parallel, kernel_func,
verbose = False, eta = 0.1, nu = 0.005, slambda = 0.2,
kernel_param = 1.0):
"""
Executes the support vector regression algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name of table under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple
models in parallel
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of
training data that will become support vectors (default value is 0.005)
@param slambda Regularisation parameter (default value is 0.2)
"""
__svm_validate_parameters(madlib_schema, input_table, model_table, parallel,
kernel_func, verbose, eta, nu, slambda)
plpy.execute("create table " + model_table
+ " ( id text, weight float8, sv float8[] ) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
plpy.execute("create table " + model_table
+ "_param ( id text, intercept float8, kernel text, kernel_param float8) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
plpy.execute("create temp table svm_temp_result ( id text, model "
+ madlib_schema + ".svm_model_rec ) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
if (verbose):
plpy.info("Parameters:")
plpy.info(" * input_table = %s" % input_table)
plpy.info(" * model_table = " + model_table)
plpy.info(" * parallel = " + str(parallel))
plpy.info(" * kernel_func = " + kernel_func)
plpy.info(" * eta = " + str(eta))
plpy.info(" * nu = " + str(nu))
plpy.info(" * slambda = " + str(slambda))
if (parallel):
# Learning multiple models in parallel
# Start learning process
sql = "insert into svm_temp_result (select '" + model_table \
+ "' || m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id'), " \
+ madlib_schema + ".svm_reg_agg(ind, label, '" + kernel_func \
+ "'," + str(eta) + "," + str(nu) + "," + str(slambda) \
+ "," + str(kernel_param) + ") from " + input_table + " group by 1)"
plpy.execute( sql)
# Store the models learned
plpy.execute("insert into " + model_table
+ "_param select id, (model).b, '" + kernel_func
+ "', " + str(kernel_param)
+ " from svm_temp_result")
__svm_store_model(madlib_schema, input_table, model_table,
'svm_temp_result', model_table)
else :
# Learning a single model
# Start learning process
sql = "insert into svm_temp_result (select '" + model_table + "', " \
+ madlib_schema + ".svm_reg_agg(ind, label, '" + kernel_func \
+ "'," + str(eta) + "," + str(nu) + "," + str(slambda) \
+ "," + str(kernel_param) + ") from " + input_table + ")"
plpy.execute(sql)
# Store the model learned
plpy.execute("insert into " + model_table
+ "_param select id, (model).b, '" + kernel_func
+ "', " + str(kernel_param)
+ " from svm_temp_result")
plpy.execute("select " + madlib_schema
+ ".svm_store_model('svm_temp_result', '"
+ model_table + "', '" + model_table + "')")
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" \
+ model_table + "' <> id"
else:
where_cond = "id = '" + model_table + "'"
summary = plpy.execute("select id, (model).inds, (model).cum_err, "
"(model).epsilon, (model).b, (model).nsvs from "
"svm_temp_result where " + where_cond)
result = []
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'],
summary[i]['inds'], summary[i]['cum_err'],
summary[i]['epsilon'], summary[i]['b'],
summary[i]['nsvs'])]
# Clean up temp storage of models
plpy.execute('drop table svm_temp_result')
return result
# -----------------------------------------------
# Function to run the classification algorithm
# -----------------------------------------------
def svm_classification(madlib_schema, input_table, model_table, parallel,
kernel_func, verbose=False, eta=0.1, nu=0.005,
kernel_param=1.0):
"""
Executes the support vector classification algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name under which we want to store the learned model
@param parallel A flag indicating whether the system should learn
multiple models in parallel
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of
training data that will become support vectors (default value is 0.005)
"""
__svm_validate_parameters(madlib_schema, input_table, model_table, parallel,
kernel_func, verbose, eta, nu)
plpy.execute("create table " + model_table
+ " ( id text, weight float8, sv float8[] ) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
plpy.execute("create table " + model_table
+ "_param ( id text, intercept float8, kernel text, kernel_param float8) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
plpy.execute("create temp table svm_temp_result ( id text, model "
+ madlib_schema + ".svm_model_rec ) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
if (verbose):
plpy.info("Parameters:")
plpy.info(" * input_table = " + input_table)
plpy.info(" * model_table = " + model_table)
plpy.info(" * parallel = " + str(parallel))
plpy.info(" * eta = " + str(eta))
plpy.info(" * nu = " + str(nu))
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = "insert into svm_temp_result (select '" + model_table \
+ "' || m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id'), " \
+ madlib_schema + ".svm_cls_agg(ind, label,'" + kernel_func \
+ "'," + str(eta) + "," + str(nu) \
+ "," + str(kernel_param) + ") from " + input_table + " group by 1)"
plpy.execute(sql)
# Store the models learned
plpy.execute("insert into " + model_table
+ "_param select id, (model).b, '" + kernel_func
+ "', " + str(kernel_param)
+ " from svm_temp_result")
__svm_store_model(madlib_schema, input_table, model_table,
'svm_temp_result', model_table)
else :
# Learning a single model
# Start learning process
sql = "insert into svm_temp_result (select '" + model_table + "', " \
+ madlib_schema + ".svm_cls_agg(ind, label,'" + kernel_func \
+ "'," + str(eta) + "," + str(nu) \
+ "," + str(kernel_param) + ") from " + input_table + ")"
plpy.execute(sql)
# Store the model learned
plpy.execute("insert into " + model_table
+ "_param select id, (model).b, '" + kernel_func
+ "', " + str(kernel_param)
+ " from svm_temp_result")
plpy.execute("select " + madlib_schema + ".svm_store_model('svm_temp_result', '"
+ model_table + "', '" + model_table + "')")
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" \
+ model_table + "' <> id"
else:
where_cond = "id = '" + model_table + "'"
summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).rho, "
"(model).b, (model).nsvs from svm_temp_result where "
+ where_cond)
result = []
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'],
summary[i]['cum_err'], summary[i]['rho'],
summary[i]['b'], summary[i]['nsvs'])]
# Clean up temp storage of models
plpy.execute('drop table svm_temp_result')
return result
# -----------------------------------------------
# Function to run the novelty detection algorithm
# -----------------------------------------------
def svm_novelty_detection(madlib_schema, input_table, model_table, parallel,
kernel_func, verbose=False, eta = 0.1, nu = 0.01,
kernel_param = 1.0):
"""
Executes the support vector novelty detection algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name of table under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple
models in parallel.
@param kernel_func Kernel function
@param verbose Verbosity of reporting (default value is False)
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training
data that will become support vectors (default value is 0.01)
"""
__svm_validate_parameters(madlib_schema, input_table, model_table,
parallel, kernel_func, verbose, eta, nu, check_label=False)
if verbose is None:
verbose = False
if eta is None:
eta = .1
if nu is None:
nu = .01
plpy.execute("create table " + model_table + " ( id text, weight float8, sv float8[] ) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
plpy.execute("create table " + model_table
+ "_param ( id text, intercept float8, kernel text, kernel_param float8) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
plpy.execute("create temp table svm_temp_result ( id text, model " + madlib_schema
+ ".svm_model_rec ) m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')")
if (verbose):
plpy.info("Parameters:")
plpy.info(" * input_table = " + input_table)
plpy.info(" * model_table = " + model_table)
plpy.info(" * parallel = " + str(parallel))
plpy.info(" * eta = " + str(eta))
plpy.info(" * nu = " + str(nu))
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = "insert into svm_temp_result (select '" + model_table \
+ "' || m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id'), " \
+ madlib_schema + ".svm_nd_agg(ind,'" + kernel_func + "'," \
+ str(eta) + "," + str(nu) \
+ "," + str(kernel_param) + ") from " + input_table + " group by 1)"
plpy.execute(sql)
# Store the models learned
plpy.execute("insert into " + model_table
+ "_param select id, (model).rho * -1.0, '" + kernel_func
+ "', " + str(kernel_param)
+ " from svm_temp_result")
__svm_store_model(madlib_schema, input_table, model_table,
'svm_temp_result', model_table)
else :
# Learning a single model
# Start learning process
sql = "insert into svm_temp_result (select '" + model_table \
+ "', " + madlib_schema + ".svm_nd_agg(ind,'" + kernel_func \
+ "'," + str(eta) + "," + str(nu) \
+ "," + str(kernel_param) + ") from " + input_table + ")"
plpy.execute(sql)
# Store the model learned
plpy.execute("insert into " + model_table
+ "_param select id, (model).rho * -1.0, '" + kernel_func
+ "', " + str(kernel_param)
+ " from svm_temp_result")
plpy.execute("select " + madlib_schema
+ ".svm_store_model('svm_temp_result', '"
+ model_table + "', '" + model_table + "')")
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" \
+ model_table + "' <> id"
else:
where_cond = "id = '" + model_table + "'"
summary = plpy.execute("select id, (model).inds, (model).rho, "
"(model).nsvs from svm_temp_result where " + where_cond)
result = []
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'],
summary[i]['rho'], summary[i]['nsvs'])]
# Clean up the temp storage of models
plpy.execute('drop table svm_temp_result')
return result
# ---------------------------------------------------
# Function to predict the labels of points in a table
# ---------------------------------------------------
def svm_predict_batch( input_table, data_col, id_col, model_table, output_table, parallel):
"""
Scores the data points stored in a table using a learned support vector model.
@param input_table Name of table/view containing the data points to be scored
@param data_col Name of column in input_table containing the data points
@param id_col Name of column in input_table containing (integer) identifier for data point
@param model_table Name of learned model
@param output_table Name of table to store the results
@param parallel A flag indicating whether the system should learn multiple models in parallel
"""
__svm_predict_validate_parameters(input_table, data_col, id_col, model_table, output_table, parallel)
#plpy.execute('drop table if exists ' + output_table)
plpy.execute("create table " + output_table + " ( id int, prediction float8 ) "
"m4_ifdef(`__POSTGRESQL__', `', `distributed by (id)')")
if (parallel) :
model_ids_t = plpy.execute("SELECT DISTINCT(id) model_id FROM " + model_table
+ " WHERE position('" + model_table
+ "' in id) > 0 AND '" + model_table + "\' <> id;")
num_models = len(model_ids_t)
for i in range(0,num_models):
param_t = plpy.execute("SELECT * FROM " + model_table + "_param WHERE id = '"
+ model_ids_t[i]['model_id'] + "'")
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
kernel_param = param_t[0]['kernel_param']
sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' \
+ kernel_func + '(m.sv, t.' + data_col + \
', ' + str(kernel_param) + '::float8)) + ' \
+ str(intercept) \
+ ' from ' + model_table + ' m, ' + input_table + ' t where m.id = \'' \
+ model_ids_t[i]['model_id'] + '\' group by 1)'
plpy.execute(sql)
else :
param_t = plpy.execute('SELECT * FROM ' + model_table + '_param')
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
kernel_param = param_t[0]['kernel_param']
sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' \
+ kernel_func + '(m.sv, t.' + data_col + \
', ' + str(kernel_param) + '::float8)) + ' \
+ str(intercept) + ' from ' \
+ model_table + ' m, ' + input_table + ' t where m.id = \'' + model_table \
+ '\' group by 1)'
plpy.execute(sql)
return '''Finished processing data points in %s table; results are stored in %s table.
''' % (input_table,output_table)
# ------------------------------------------------------------------------------
# This function stores a collection of models learned in parallel into the model_table.
# The different models are stored in model_temp_table and are assumed to be named model_table1, model_table2, ....
# ------------------------------------------------------------------------------
def __svm_store_model(madlib_schema, input_table, model_table, model_temp_table, model_name):
# Store the models learned
numproc_t = plpy.execute("select distinct(m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id')) "
"AS seg_id from " + input_table)
res_len = len(numproc_t)
for i in range(0,res_len):
seg_id = numproc_t[i]['seg_id']
plpy.execute("select "+madlib_schema + ".svm_store_model('" + model_temp_table +
"', '" + model_name + str(seg_id) + "', '" + model_table + "')")