blob: 65fc0178c0ac65aa687b88e7d89bbd41780ccaec [file] [log] [blame]
#!/usr/bin/env python
import plpy
def __validate_input_table(input_table) :
Assert the name of the input table should not be None, and
the dimension of the column "ind" should be less than or equal to 102, 400.
@param input_table Name of table/view containing the training data
rv = plpy.execute("SELECT array_upper(ind, 1) as dim FROM {0} LIMIT 1".format(input_table));
if rv.nrows() == 0 :
plpy.error("the training table is empty");
if rv is not None and rv[0]["dim"] > 102400:
plpy.error("the maximum number of features is 102400");
# -----------------------------------------------
# Function to run the regression algorithm
# -----------------------------------------------
def svm_regression( madlib_schema, input_table, model_table, parallel, kernel_func, verbose = False, eta = 0.1, nu = 0.005, slambda = 0.05):
Executes the support vector regression algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name of table under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models in parallel
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.005)
@param slambda Regularisation parameter (default value is 0.2)
# Output error if model_table already exist
# if __check_rel_exist(model_table):
# plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.');
# plpy.execute('drop table if exists ' + model_table);
plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
if (verbose):"Parameters:");" * input_table = %s" % input_table);" * model_table = " + model_table);" * parallel = " + str(parallel));" * kernel_func = " + kernel_func);" * eta = " + str(eta));" * nu = " + str(nu));" * slambda = " + str(slambda));
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`__GREENPLUM__', `gp_segment_id', `0'), ' + madlib_schema + '.svm_reg_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ',' + str(slambda) + ') from ' + input_table + ' group by 1)';
plpy.execute( sql);
# Store the models learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
__svm_store_model(madlib_schema, input_table, model_table, 'svm_temp_result', model_table)
else :
# Learning a single model
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_reg_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ',' + str(slambda) + ') from ' + input_table + ')';
# Store the model learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')');
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id";
where_cond = "id = '" + model_table + "'";
summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).epsilon, (model).b, (model).nsvs from svm_temp_result where " + where_cond);
result = [];
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['cum_err'], summary[i]['epsilon'], summary[i]['b'], summary[i]['nsvs'])];
# Clean up temp storage of models
plpy.execute('drop table svm_temp_result');
return result;
# -----------------------------------------------
# Function to run the classification algorithm
# -----------------------------------------------
def svm_classification( madlib_schema, input_table, model_table, parallel, kernel_func, verbose=False, eta=0.1, nu=0.005):
Executes the support vector classification algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models in parallel
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.005)
# Output error if model_table already exist
# if __check_rel_exist(model_table):
# plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.');
# plpy.execute('drop table if exists ' + model_table);
plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
if (verbose):"Parameters:");" * input_table = " + input_table);" * model_table = " + model_table);" * parallel = " + str(parallel));" * eta = " + str(eta));" * nu = " + str(nu));
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`__GREENPLUM__', `gp_segment_id', `0'), ' + madlib_schema + '.svm_cls_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ' group by 1)';
# Store the models learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
__svm_store_model(madlib_schema, input_table, model_table, 'svm_temp_result', model_table)
else :
# Learning a single model
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_cls_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ')';
# Store the model learned
plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result');
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')');
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id";
where_cond = "id = '" + model_table + "'";
summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).rho, (model).b, (model).nsvs from svm_temp_result where " + where_cond);
result = [];
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['cum_err'], summary[i]['rho'], summary[i]['b'], summary[i]['nsvs'])];
# Clean up temp storage of models
plpy.execute('drop table svm_temp_result');
return result;
# -----------------------------------------------
# Function to run the novelty detection algorithm
# -----------------------------------------------
def svm_novelty_detection( madlib_schema, input_table, model_table, parallel, kernel_func, verbose=False, eta = 0.1, nu = 0.01):
Executes the support vector novelty detection algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name of table under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models in parallel.
@param kernel_func Kernel function
@param verbose Verbosity of reporting
@param eta Learning rate in (0,1] (default value is 0.1)
@param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.01)
# Output error if model_table already exist
# if __check_rel_exist(model_table):
# plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.');
# plpy.execute('drop table if exists ' + model_table);
plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`__GREENPLUM__', `distributed randomly')');
if (verbose):"Parameters:");" * input_table = " + input_table);" * model_table = " + model_table);" * parallel = " + str(parallel));" * eta = " + str(eta));" * nu = " + str(nu));
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`__GREENPLUM__', `gp_segment_id', `0'), ' + madlib_schema + '.svm_nd_agg(ind,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ' group by 1)';
# Store the models learned
plpy.execute('insert into ' + model_table + '_param select id, (model).rho * -1.0, \'' + kernel_func + '\' from svm_temp_result');
__svm_store_model(madlib_schema, input_table, model_table, 'svm_temp_result', model_table)
else :
# Learning a single model
# Start learning process
sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_nd_agg(ind,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ')';
# Store the model learned
plpy.execute('insert into ' + model_table + '_param select id, (model).rho * -1.0, \'' + kernel_func + '\' from svm_temp_result');
plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')');
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id";
where_cond = "id = '" + model_table + "'";
summary = plpy.execute("select id, (model).inds, (model).rho, (model).nsvs from svm_temp_result where " + where_cond);
result = [];
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['rho'], summary[i]['nsvs'])];
# Clean up the temp storage of models
plpy.execute('drop table svm_temp_result');
return result;
# ----------------------------------------------
# Function to predict the labels of a data point
# ----------------------------------------------
def svm_predict(model_table, ind):
Scores a data point using a learned support vector model.
@param model_table The table storing the learned model to be used
@param ind The data point to be scored
nmodels_rec = plpy.execute("select count(distinct id) from " + model_table);
if (nmodels_rec[0]['count'] <> 1):
plpy.error("Error in svm_predict(): the table contains an ensemble of models");
param_t = plpy.execute("select * from " + model_table + "_param")
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
ret = plpy.execute("SELECT sum(weight * " + kernel_func + "(sv, array[" + str(ind)[1:-1] + "]::float8[])) + " + str(intercept) + " as sum FROM " + model_table);
if (ret.nrows() == 0):
plpy.error("Error executing svm_predict()");
return ret[0]['sum'];
# ----------------------------------------------
# Function to predict the labels of a data point
# ----------------------------------------------
def svm_predict_combo( madlib_schema, model_table, ind):
Scores a data point using an ensemble of support vector models.
@param model_table The table storing the learned model to be used
@param ind The data point to be scored
model_ids_t = plpy.execute("SELECT DISTINCT(id) model_id from " + model_table);
nmodels = len(model_ids_t);
if (nmodels <= 1):"svm_predict_combo(): not an ensemble of models, will use a single prediction");
pred = plpy.execute("select * from " + madlib_schema + ".svm_predict('" + model_table + "', array[" + str(ind)[1:-1] + "])")
return [( model_table, pred[0]['svm_predict'] )]
sumpr = 0.0;
ret = [];
for i in range(0,nmodels):
param_t = plpy.execute("select * from " + model_table + "_param where id = '" + model_ids_t[i]['model_id'] + "'")
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
pred = plpy.execute("select sum(weight * " + kernel_func + "(sv, array[" + str(ind)[1:-1] + "])) + " + str(intercept) + " as sum from " + model_table + " where id = '" + model_ids_t[i]['model_id'] + "'");
if (pred.nrows() == 0):
plpy.error("svm_predict_combo(): failed to compute prediction for model '" + model_ids_t[i]['model_id'] + "'.");
prediction = pred[0]['sum'];
sumpr = sumpr + prediction;
ret = ret + [(model_ids_t[i]['model_id'], prediction)];
ret = ret + [('avg', sumpr/nmodels)];
return ret;
# ---------------------------------------------------
# Function to predict the labels of points in a table
# ---------------------------------------------------
def svm_predict_batch( input_table, data_col, id_col, model_table, output_table, parallel):
Scores the data points stored in a table using a learned support vector model.
@param input_table Name of table/view containing the data points to be scored
@param data_col Name of column in input_table containing the data points
@param id_col Name of column in input_table containing (integer) identifier for data point
@param model_table Name of learned model
@param output_table Name of table to store the results
@param parallel A flag indicating whether the system should learn multiple models in parallel
plpy.execute('drop table if exists ' + output_table);
plpy.execute('create table ' + output_table + ' ( id int, prediction float8 ) m4_ifdef(`__GREENPLUM__', `distributed by (id)')');
if (parallel) :
model_ids_t = plpy.execute('SELECT DISTINCT(id) model_id FROM ' + model_table + ' WHERE position(\'' + model_table + '\' in id) > 0 AND \'' + model_table + '\' <> id;');
num_models = len(model_ids_t)
for i in range(0,num_models):
param_t = plpy.execute('SELECT * FROM ' + model_table + '_param WHERE id = \'' + model_ids_t[i]['model_id'] + '\'')
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' + kernel_func + '(, t.' + data_col + ')) + ' + str(intercept) + ' from ' + model_table + ' m, ' + input_table + ' t where = \'' + model_ids_t[i]['model_id'] + '\' group by 1)';
else :
param_t = plpy.execute('SELECT * FROM ' + model_table + '_param');
intercept = param_t[0]['intercept']
kernel_func = param_t[0]['kernel']
sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' + kernel_func + '(, t.' + data_col + ')) + ' + str(intercept) + ' from ' + model_table + ' m, ' + input_table + ' t where = \'' + model_table + '\' group by 1)';
return '''Finished processing data points in %s table; results are stored in %s table.
''' % (input_table,output_table)
# ---------------------------------------------------
# Function to run the linear classification algorithm
# ---------------------------------------------------
def lsvm_classification( madlib_schema, input_table, model_table, parallel, verbose=False, eta=0.1, reg=0.001):
Executes the linear support vector classification algorithm.
@param input_table Name of table/view containing the training data
@param model_table Name under which we want to store the learned model
@param parallel A flag indicating whether the system should learn multiple models in parallel
@param verbose Verbosity of reporting
@param eta Initial learning rate in (0,1] (default value is 0.1)
@param reg Regularization parameter, often chosen by cross-validation (default value is 0.001)
plpy.execute('CREATE TABLE ' + model_table + ' (id text, weights float8[], wdiv float8, wbias float8) m4_ifdef(`__GREENPLUM__', `DISTRIBUTED RANDOMLY')');
plpy.execute('CREATE TEMP TABLE svm_temp_result ( id text, model ' + madlib_schema + '.lsvm_sgd_model_rec ) m4_ifdef(`__GREENPLUM__', `DISTRIBUTED RANDOMLY')');
if (verbose):"Parameters:");" * input_table = %s" % input_table);" * model_table = " + model_table);" * parallel = " + str(parallel));" * eta = " + str(eta));" * reg = " + str(reg));
if (parallel) :
# Learning multiple models in parallel
# Start learning process
sql = 'INSERT INTO svm_temp_result (SELECT \'' + model_table + '\' || m4_ifdef(`__GREENPLUM__', `gp_segment_id', `0'), ' + madlib_schema + '.lsvm_sgd_agg(ind, label, ' + str(eta) + ',' + str(reg) + ') FROM ' + input_table + ' group by 1)';
# Store the model learned
plpy.execute('INSERT INTO ' + model_table + ' SELECT id, (model).weights, (model).wdiv, (model).wbias From svm_temp_result');
else :
# Learning multiple models in parallel
# Start learning a single model
sql = 'INSERT INTO svm_temp_result (SELECT \'' + model_table + '\',' + madlib_schema + '.lsvm_sgd_agg(ind, label,' + str(eta) + ',' + str(reg) + ') FROM ' + input_table + ')';
# Store the model learned
plpy.execute('INSERT INTO ' + model_table + ' SELECT id, (model).weights, (model).wdiv, (model).wbias FROM svm_temp_result');
# Retrieve and return the summary for each model learned
if parallel:
where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id";
where_cond = "id = '" + model_table + "'";
summary = plpy.execute("SELECT id, (model).inds, (model).ind_dim, (model).cum_err, (model).wbias, (model).wdiv from svm_temp_result where " + where_cond);
result = [];
for i in range(0,summary.nrows()):
result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['ind_dim'], summary[i]['cum_err'], summary[i]['wbias'], summary[i]['wdiv'])];
# Clean up temp storage of models
plpy.execute('drop table svm_temp_result');
return result;
# ----------------------------------------------------------------------------------
# Function to predict the labels of a data point using a linear support vector model
# ----------------------------------------------------------------------------------
def lsvm_predict(madlib_schema, model_table, ind):
Scores a data point using a learned linear support vector model.
@param model_table The table storing the learned model to be used
@param ind The data point to be scored
nmodels_rec = plpy.execute("SELECT count(distinct id) FROM " + model_table)
if (nmodels_rec[0]['count'] <> 1):
plpy.error("Error in lsvm_predict(): the table contains an ensemble of models")
param_t = plpy.execute('SELECT * FROM ' + model_table);
weights = param_t[0]['weights']
wdiv = param_t[0]['wdiv']
wbias = param_t[0]['wbias']
sql = 'SELECT ' + madlib_schema + '.svm_dot(\'' + (str(weights).replace('[', '{')).replace(']','}') + '\', \'' + (str(ind).replace('[', '{')).replace(']','}') +'\') /' + str(wdiv) + ' + ' + str(wbias) + ' AS sum FROM ' + model_table
ret = plpy.execute(sql);
if (ret.nrows() == 0):
plpy.error("Error executing lsvm_predict()");
return ret[0]['sum'];
# ------------------------------------------------------------------------------------------
# Function to predict the labels of a data point using a set of linear support vector models
# ------------------------------------------------------------------------------------------
def lsvm_predict_combo( madlib_schema, model_table, ind):
Scores a data point using an ensemble of support vector models.
@param model_table The table storing the learned model to be used
@param ind The data point to be scored
model_ids_t = plpy.execute("SELECT DISTINCT(id) model_id from " + model_table);
nmodels = len(model_ids_t);
if (nmodels <= 1):"lsvm_predict_combo(): not an ensemble of models, will use a single prediction");
pred = plpy.execute("SELECT * from " + madlib_schema + ".lsvm_predict('" + model_table + "', array[" + str(ind)[1:-1] + "])")
return [( model_table, pred[0]['lsvm_predict'] )]
sumpr = 0.0;
ret = [];
for i in range(0,nmodels):
param_t = plpy.execute("SELECT * FROM " + model_table + " WHERE id = '" + model_ids_t[i]['model_id'] + "'")
weights = param_t[0]['weights']
wdiv = param_t[0]['wdiv']
wbias = param_t[0]['wbias']
sql = 'SELECT ' + madlib_schema + '.svm_dot(\'' +(str(weights).replace('[', '{')).replace(']','}') + '\', \'' + (str(ind).replace('[', '{')).replace(']','}') +'\') /' + str(wdiv) + ' + ' + str(wbias) + ' AS sum FROM ' + model_table + ' WHERE id = \'' + model_ids_t[i]['model_id'] + '\''
pred = plpy.execute(sql);
if (pred.nrows() == 0):
plpy.error("lsvm_predict_combo(): failed to compute prediction for model '" + model_ids_t[i]['model_id'] + "'.");
prediction = pred[0]['sum'];
sumpr = sumpr + prediction;
ret = ret + [(model_ids_t[i]['model_id'], prediction)];
ret = ret + [('avg', sumpr/nmodels)];
return ret;
# ------------------------------------------------------------------------------
# Function to predict the labels of points in a table using a linear classifier
# ------------------------------------------------------------------------------
def lsvm_predict_batch( madlib_schema, input_table, data_col, id_col, model_table, output_table, parallel):
Scores the data points stored in a table using a learned support vector model.
@param input_table Name of table/view containing the data points to be scored
@param data_col Name of column in input_table containing the data points
@param id_col Name of column in input_table containing (integer) identifier for data point
@param model_table Name of learned model
@param output_table Name of table to store the results
@param parallel A flag indicating whether the system should learn multiple models in parallel
plpy.execute('DROP TABLE IF EXISTS ' + output_table);
plpy.execute('CREATE TABLE ' + output_table + ' ( id int, prediction float8 ) m4_ifdef(`__GREENPLUM__', `distributed by (id)')');
if (parallel) :
model_ids_t = plpy.execute('SELECT DISTINCT(id) model_id FROM ' + model_table + ' WHERE position(\'' + model_table + '\' in id) > 0 AND \'' + model_table + '\' <> id;');
num_models = len(model_ids_t)
if (num_models == 0) :
plpy.error("lsvm_predict_batch(): the specified model table does not contain parallelly learned models.");
for i in range(0,num_models):
param_t = plpy.execute('SELECT * FROM ' + model_table + ' WHERE id = \'' + model_ids_t[i]['model_id'] + '\'')
weights = param_t[0]['weights']
wdiv = param_t[0]['wdiv']
wbias = param_t[0]['wbias']
sql = 'INSERT INTO ' + output_table + ' (SELECT ' + id_col + ',' + madlib_schema + '.svm_dot(\'' +(str(weights).replace('[', '{')).replace(']','}') + '\', ind)/' + str(wdiv) + '+' + str(wbias) + ' FROM ' + input_table + ')';
else :
param_t = plpy.execute('SELECT * FROM ' + model_table);
weights = param_t[0]['weights']
wdiv = param_t[0]['wdiv']
wbias = param_t[0]['wbias']
sql = 'INSERT INTO ' + output_table + ' (SELECT ' + id_col + ', ' + madlib_schema + '.svm_dot(\'' + (str(weights).replace('[', '{')).replace(']','}') + '\', ind) /' + str(wdiv) + ' + ' + str(wbias) + ' FROM ' + input_table + ')';
return '''Finished processing data points in %s table; results are stored in %s table.
''' % (input_table,output_table)
# ------------------------------------------------------------------------------
# This function stores a collection of models learned in parallel into the model_table.
# The different models are stored in model_temp_table and are assumed to be named model_table1, model_table2, ....
# ------------------------------------------------------------------------------
def __svm_store_model(madlib_schema, input_table, model_table, model_temp_table, model_name):
# Store the models learned
numproc_t = plpy.execute('select distinct(m4_ifdef(`__GREENPLUM__', `gp_segment_id', `0')) AS seg_id from ' + input_table);
res_len = len(numproc_t);
for i in range(0,res_len):
seg_id = numproc_t[i]['seg_id'];
plpy.execute("select "+madlib_schema + ".svm_store_model('" + model_temp_table + \
"', '" + model_name + str(seg_id) + "', '" + model_table + "')");