blob: 022a77f9d4af550fcc74c7d2613d967bae928a72 [file] [log] [blame]
#!/usr/bin/env python
import plpy
# -----------------------------------------------
# Function to run the regression algorithm
# -----------------------------------------------
def svm_regression( madlib_schema, input_table, model_table, parallel, kernel_func, verbose = False, eta = 0.1, nu = 0.005, slambda = 0.05):
"""
Executes the support vector regression algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name of table under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models in parallel
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.005)
@param slambda Regularisation parameter (default value is 0.2)
"""
# Output error if model_table already exist
# if __check_rel_exist(model_table):
# plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.');
# plpy.execute('drop table if exists ' + model_table);
plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`GREENPLUM', `distributed randomly')');
plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`GREENPLUM', `distributed randomly')');
plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`GREENPLUM', `distributed randomly')');
if (verbose):
plpy.info("Parameters:");
plpy.info(" * input_table = %s" % input_table);
plpy.info(" * model_table = " + model_table);
plpy.info(" * parallel = " + str(parallel));
plpy.info(" * kernel_func = " + kernel_func);
plpy.info(" * eta = " + str(eta));
plpy.info(" * nu = " + str(nu));
plpy.info(" * slambda = " + str(slambda));
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`GREENPLUM', `gp_segment_id', `0'), ' + madlib_schema + '.svm_reg_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ',' + str(slambda) + ') from ' + input_table + ' group by 1)';
plpy.execute( sql);
# Store the models learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
numproc_t = plpy.execute('select count(distinct(m4_ifdef(`GREENPLUM', `gp_segment_id', `0'))) from ' + input_table);
numproc = numproc_t[0]['count'];
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\',\'' + model_table + '\', ' + str(numproc) + ')');
else :
# Learning a single model
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_reg_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ',' + str(slambda) + ') from ' + input_table + ')';
plpy.execute(sql);
# Store the model learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')');
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id";
else:
where_cond = "id = '" + model_table + "'";
summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).epsilon, (model).b, (model).nsvs from svm_temp_result where " + where_cond);
result = [];
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['cum_err'], summary[i]['epsilon'], summary[i]['b'], summary[i]['nsvs'])];
# Clean up temp storage of models
plpy.execute('drop table svm_temp_result');
return result;
# -----------------------------------------------
# Function to run the classification algorithm
# -----------------------------------------------
def svm_classification( madlib_schema, input_table, model_table, parallel, kernel_func, verbose=False, eta=0.1, nu=0.005):
"""
Executes the support vector classification algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models in parallel
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.005)
"""
# Output error if model_table already exist
# if __check_rel_exist(model_table):
# plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.');
# plpy.execute('drop table if exists ' + model_table);
plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`GREENPLUM', `distributed randomly')');
plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`GREENPLUM', `distributed randomly')');
plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`GREENPLUM', `distributed randomly')');
if (verbose):
plpy.info("Parameters:");
plpy.info(" * input_table = " + input_table);
plpy.info(" * model_table = " + model_table);
plpy.info(" * parallel = " + str(parallel));
plpy.info(" * eta = " + str(eta));
plpy.info(" * nu = " + str(nu));
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`GREENPLUM', `gp_segment_id', `0'), ' + madlib_schema + '.svm_cls_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ' group by 1)';
plpy.execute(sql);
# Store the models learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
numproc_t = plpy.execute('select count(distinct(m4_ifdef(`GREENPLUM', `gp_segment_id', `0'))) from ' + input_table);
numproc = numproc_t[0]['count'];
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\',\'' + model_table + '\', ' + str(numproc) + ')');
else :
# Learning a single model
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_cls_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ')';
plpy.execute(sql);
# Store the model learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')');
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id";
else:
where_cond = "id = '" + model_table + "'";
summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).rho, (model).b, (model).nsvs from svm_temp_result where " + where_cond);
result = [];
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['cum_err'], summary[i]['rho'], summary[i]['b'], summary[i]['nsvs'])];
# Clean up temp storage of models
plpy.execute('drop table svm_temp_result');
return result;
# -----------------------------------------------
# Function to run the novelty detection algorithm
# -----------------------------------------------
def svm_novelty_detection( madlib_schema, input_table, model_table, parallel, kernel_func, verbose=False, eta = 0.1, nu = 0.01):
"""
Executes the support vector novelty detection algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name of table under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models in parallel.
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.01)
"""
# Output error if model_table already exist
# if __check_rel_exist(model_table):
# plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.');
# plpy.execute('drop table if exists ' + model_table);
plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`GREENPLUM', `distributed randomly')');
plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`GREENPLUM', `distributed randomly')');
plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`GREENPLUM', `distributed randomly')');
if (verbose):
plpy.info("Parameters:");
plpy.info(" * input_table = " + input_table);
plpy.info(" * model_table = " + model_table);
plpy.info(" * parallel = " + str(parallel));
plpy.info(" * eta = " + str(eta));
plpy.info(" * nu = " + str(nu));
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`GREENPLUM', `gp_segment_id', `0'), ' + madlib_schema + '.svm_nd_agg(ind,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ' group by 1)';
plpy.execute(sql);
# Store the models learned
plpy.execute('insert into ' + model_table + '_param select id, (model).rho * -1.0, \'' + kernel_func + '\' from svm_temp_result');
numproc_t = plpy.execute('select count(distinct(m4_ifdef(`GREENPLUM', `gp_segment_id', `0'))) from ' + input_table);
numproc = numproc_t[0]['count'];
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\',\'' + model_table + '\', ' + str(numproc) + ')');
else :
# Learning a single model
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_nd_agg(ind,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ')';
plpy.execute(sql);
# Store the model learned
plpy.execute('insert into ' + model_table + '_param select id, (model).rho * -1.0, \'' + kernel_func + '\' from svm_temp_result');
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')');
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id";
else:
where_cond = "id = '" + model_table + "'";
summary = plpy.execute("select id, (model).inds, (model).rho, (model).nsvs from svm_temp_result where " + where_cond);
result = [];
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['rho'], summary[i]['nsvs'])];
# Clean up the temp storage of models
plpy.execute('drop table svm_temp_result');
return result;
# ----------------------------------------------
# Function to predict the labels of a data point
# ----------------------------------------------
def svm_predict(model_table, ind):
"""
Scores a data point using a learned support vector model.
@param model_table The table storing the learned model to be used
@param ind The data point to be scored
"""
nmodels_rec = plpy.execute("select count(distinct id) from " + model_table);
if (nmodels_rec[0]['count'] <> 1):
plpy.error("Error in svm_predict(): the table contains an ensemble of models");
param_t = plpy.execute("select * from " + model_table + "_param")
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
ret = plpy.execute("SELECT sum(weight * " + kernel_func + "(sv, array[" + str(ind)[1:-1] + "]::float8[])) + " + str(intercept) + " as sum FROM " + model_table);
if (ret.nrows() == 0):
plpy.error("Error executing svm_predict()");
return ret[0]['sum'];
# ----------------------------------------------
# Function to predict the labels of a data point
# ----------------------------------------------
def svm_predict_combo( madlib_schema, model_table, ind):
"""
Scores a data point using an ensemble of support vector models.
@param model_table The table storing the learned model to be used
@param ind The data point to be scored
"""
nmodels_rec = plpy.execute("select count(distinct id) from " + model_table);
nmodels = nmodels_rec[0]['count'];
if (nmodels <= 1):
plpy.info("svm_predict_combo(): not an ensemble of models, will use a single prediction");
pred = plpy.execute("select * from " + madlib_schema + ".svm_predict('" + model_table + "', array[" + str(ind)[1:-1] + "])")
return [( model_table, pred[0]['svm_predict'] )]
sumpr = 0.0;
ret = [];
for i in range(0,nmodels):
param_t = plpy.execute("select * from " + model_table + "_param where id = '" + model_table + str(i) + "'")
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
pred = plpy.execute("select sum(weight * " + kernel_func + "(sv, array[" + str(ind)[1:-1] + "])) + " + str(intercept) + " as sum from " + model_table + " where id = '" + model_table + str(i) + "'");
if (pred.nrows() == 0):
plpy.error("svm_predict_combo(): failed to compute prediction for model '" + model_table + str(i) + "'.");
prediction = pred[0]['sum'];
sumpr = sumpr + prediction;
ret = ret + [(model_table + str(i), prediction)];
ret = ret + [('avg', sumpr/nmodels)];
return ret;
# ---------------------------------------------------
# Function to predict the labels of points in a table
# ---------------------------------------------------
def svm_predict_batch( input_table, data_col, id_col, model_table, output_table, parallel):
"""
Scores the data points stored in a table using a learned support vector model.
@param input_table Name of table/view containing the data points to be scored
@param data_col Name of column in input_table containing the data points
@param id_col Name of column in input_table containing (integer) identifier for data point
@param model_table Name of learned model
@param output_table Name of table to store the results
@param parallel A flag indicating whether the system should learn multiple models in parallel
"""
plpy.execute('drop table if exists ' + output_table);
plpy.execute('create table ' + output_table + ' ( id int, prediction float8 ) m4_ifdef(`GREENPLUM', `distributed by (id)')');
if (parallel) :
num_models_t = plpy.execute('SELECT COUNT(DISTINCT(id)) n FROM ' + model_table + ' WHERE position(\'' + model_table + '\' in id) > 0 AND \'' + model_table + '\' <> id;');
num_models = num_models_t[0]['n'];
for i in range(0,num_models):
param_t = plpy.execute('SELECT * FROM ' + model_table + '_param WHERE id = \'' + model_table + str(i) + '\'')
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' + kernel_func + '(m.sv, t.' + data_col + ')) + ' + str(intercept) + ' from ' + model_table + ' m, ' + input_table + ' t where m.id = \'' + model_table + str(i) + '\' group by 1)';
plpy.execute(sql);
else :
param_t = plpy.execute('SELECT * FROM ' + model_table + '_param');
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' + kernel_func + '(m.sv, t.' + data_col + ')) + ' + str(intercept) + ' from ' + model_table + ' m, ' + input_table + ' t where m.id = \'' + model_table + '\' group by 1)';
plpy.execute(sql);
return '''Finished processing data points in %s table; results are stored in %s table.
''' % (input_table,output_table)