| #!/usr/bin/env python |
| |
| import plpy |
| |
| # ----------------------------------------------- |
| # Function to run the regression algorithm |
| # ----------------------------------------------- |
| def svm_regression( madlib_schema, input_table, model_table, parallel, kernel_func, verbose = False, eta = 0.1, nu = 0.005, slambda = 0.05): |
| """ |
| Executes the support vector regression algorithm. |
| |
| @param input_table Name of table/view containing the training data |
| @param model_table Name of table under which we want to store the learned model |
| @param parallel A flag indicating whether the system should learn multiple models in parallel |
| @param kernel_func Kernel function |
| @param verbose Verbosity of reporting |
| @param eta Learning rate in (0,1] (default value is 0.1) |
| @param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.005) |
| @param slambda Regularisation parameter (default value is 0.2) |
| |
| """ |
| |
| # Output error if model_table already exist |
| # if __check_rel_exist(model_table): |
| # plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.'); |
| |
| # plpy.execute('drop table if exists ' + model_table); |
| plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| |
| plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| |
| if (verbose): |
| plpy.info("Parameters:"); |
| plpy.info(" * input_table = %s" % input_table); |
| plpy.info(" * model_table = " + model_table); |
| plpy.info(" * parallel = " + str(parallel)); |
| plpy.info(" * kernel_func = " + kernel_func); |
| plpy.info(" * eta = " + str(eta)); |
| plpy.info(" * nu = " + str(nu)); |
| plpy.info(" * slambda = " + str(slambda)); |
| |
| if (parallel) : |
| # Learning multiple models in parallel |
| |
| # Start learning process |
| sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`GREENPLUM', `gp_segment_id', `0'), ' + madlib_schema + '.svm_reg_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ',' + str(slambda) + ') from ' + input_table + ' group by 1)'; |
| plpy.execute( sql); |
| |
| # Store the models learned |
| plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result'); |
| numproc_t = plpy.execute('select count(distinct(m4_ifdef(`GREENPLUM', `gp_segment_id', `0'))) from ' + input_table); |
| numproc = numproc_t[0]['count']; |
| plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\',\'' + model_table + '\', ' + str(numproc) + ')'); |
| |
| else : |
| # Learning a single model |
| |
| # Start learning process |
| sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_reg_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ',' + str(slambda) + ') from ' + input_table + ')'; |
| plpy.execute(sql); |
| # Store the model learned |
| plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result'); |
| plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')'); |
| |
| # Retrieve and return the summary for each model learned |
| if parallel: |
| where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id"; |
| else: |
| where_cond = "id = '" + model_table + "'"; |
| |
| summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).epsilon, (model).b, (model).nsvs from svm_temp_result where " + where_cond); |
| |
| result = []; |
| for i in range(0,summary.nrows()): |
| result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['cum_err'], summary[i]['epsilon'], summary[i]['b'], summary[i]['nsvs'])]; |
| |
| # Clean up temp storage of models |
| plpy.execute('drop table svm_temp_result'); |
| |
| return result; |
| |
| # ----------------------------------------------- |
| # Function to run the classification algorithm |
| # ----------------------------------------------- |
| def svm_classification( madlib_schema, input_table, model_table, parallel, kernel_func, verbose=False, eta=0.1, nu=0.005): |
| """ |
| Executes the support vector classification algorithm. |
| |
| @param input_table Name of table/view containing the training data |
| @param model_table Name under which we want to store the learned model |
| @param parallel A flag indicating whether the system should learn multiple models in parallel |
| @param kernel_func Kernel function |
| @param verbose Verbosity of reporting |
| @param eta Learning rate in (0,1] (default value is 0.1) |
| @param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.005) |
| |
| """ |
| |
| # Output error if model_table already exist |
| # if __check_rel_exist(model_table): |
| # plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.'); |
| |
| # plpy.execute('drop table if exists ' + model_table); |
| plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| |
| plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| |
| if (verbose): |
| plpy.info("Parameters:"); |
| plpy.info(" * input_table = " + input_table); |
| plpy.info(" * model_table = " + model_table); |
| plpy.info(" * parallel = " + str(parallel)); |
| plpy.info(" * eta = " + str(eta)); |
| plpy.info(" * nu = " + str(nu)); |
| |
| if (parallel) : |
| # Learning multiple models in parallel |
| |
| # Start learning process |
| sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`GREENPLUM', `gp_segment_id', `0'), ' + madlib_schema + '.svm_cls_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ' group by 1)'; |
| plpy.execute(sql); |
| |
| # Store the models learned |
| plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result'); |
| numproc_t = plpy.execute('select count(distinct(m4_ifdef(`GREENPLUM', `gp_segment_id', `0'))) from ' + input_table); |
| numproc = numproc_t[0]['count']; |
| plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\',\'' + model_table + '\', ' + str(numproc) + ')'); |
| |
| else : |
| # Learning a single model |
| |
| # Start learning process |
| sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_cls_agg(ind, label,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ')'; |
| plpy.execute(sql); |
| |
| # Store the model learned |
| plpy.execute('insert into ' + model_table + '_param select id, (model).b, \'' + kernel_func + '\' from svm_temp_result'); |
| plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')'); |
| |
| # Retrieve and return the summary for each model learned |
| if parallel: |
| where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id"; |
| else: |
| where_cond = "id = '" + model_table + "'"; |
| |
| summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).rho, (model).b, (model).nsvs from svm_temp_result where " + where_cond); |
| |
| result = []; |
| for i in range(0,summary.nrows()): |
| result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['cum_err'], summary[i]['rho'], summary[i]['b'], summary[i]['nsvs'])]; |
| |
| # Clean up temp storage of models |
| plpy.execute('drop table svm_temp_result'); |
| |
| return result; |
| |
| # ----------------------------------------------- |
| # Function to run the novelty detection algorithm |
| # ----------------------------------------------- |
| def svm_novelty_detection( madlib_schema, input_table, model_table, parallel, kernel_func, verbose=False, eta = 0.1, nu = 0.01): |
| """ |
| Executes the support vector novelty detection algorithm. |
| |
| @param input_table Name of table/view containing the training data |
| @param model_table Name of table under which we want to store the learned model |
| @param parallel A flag indicating whether the system should learn multiple models in parallel. |
| @param kernel_func Kernel function |
| @param verbose Verbosity of reporting |
| @param eta Learning rate in (0,1] (default value is 0.1) |
| @param nu Compression parameter in (0,1] associated with the fraction of training data that will become support vectors (default value is 0.01) |
| """ |
| |
| # Output error if model_table already exist |
| # if __check_rel_exist(model_table): |
| # plpy.error('Table ' + model_table + ' exists; please use a different model table or drop ' + model_table + ' before calling this function.'); |
| |
| # plpy.execute('drop table if exists ' + model_table); |
| plpy.execute('create table ' + model_table + ' ( id text, weight float8, sv float8[] ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| plpy.execute('create table ' + model_table + '_param ( id text, intercept float8, kernel text ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| |
| plpy.execute('create temp table svm_temp_result ( id text, model ' + madlib_schema + '.svm_model_rec ) m4_ifdef(`GREENPLUM', `distributed randomly')'); |
| |
| if (verbose): |
| plpy.info("Parameters:"); |
| plpy.info(" * input_table = " + input_table); |
| plpy.info(" * model_table = " + model_table); |
| plpy.info(" * parallel = " + str(parallel)); |
| plpy.info(" * eta = " + str(eta)); |
| plpy.info(" * nu = " + str(nu)); |
| |
| if (parallel) : |
| # Learning multiple models in parallel |
| |
| # Start learning process |
| sql = 'insert into svm_temp_result (select \'' + model_table + '\' || m4_ifdef(`GREENPLUM', `gp_segment_id', `0'), ' + madlib_schema + '.svm_nd_agg(ind,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ' group by 1)'; |
| plpy.execute(sql); |
| |
| # Store the models learned |
| plpy.execute('insert into ' + model_table + '_param select id, (model).rho * -1.0, \'' + kernel_func + '\' from svm_temp_result'); |
| numproc_t = plpy.execute('select count(distinct(m4_ifdef(`GREENPLUM', `gp_segment_id', `0'))) from ' + input_table); |
| numproc = numproc_t[0]['count']; |
| plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\',\'' + model_table + '\', ' + str(numproc) + ')'); |
| |
| else : |
| # Learning a single model |
| |
| # Start learning process |
| sql = 'insert into svm_temp_result (select \'' + model_table + '\', ' + madlib_schema + '.svm_nd_agg(ind,\'' + kernel_func + '\',' + str(eta) + ',' + str(nu) + ') from ' + input_table + ')'; |
| plpy.execute(sql); |
| |
| # Store the model learned |
| plpy.execute('insert into ' + model_table + '_param select id, (model).rho * -1.0, \'' + kernel_func + '\' from svm_temp_result'); |
| plpy.execute('select ' + madlib_schema + '.svm_store_model(\'svm_temp_result\', \'' + model_table + '\', \'' + model_table + '\')'); |
| |
| # Retrieve and return the summary for each model learned |
| if parallel: |
| where_cond = "position('" + model_table + "' in id) > 0 AND '" + model_table + "' <> id"; |
| else: |
| where_cond = "id = '" + model_table + "'"; |
| |
| summary = plpy.execute("select id, (model).inds, (model).rho, (model).nsvs from svm_temp_result where " + where_cond); |
| |
| result = []; |
| for i in range(0,summary.nrows()): |
| result = result + [(model_table, summary[i]['id'], summary[i]['inds'], summary[i]['rho'], summary[i]['nsvs'])]; |
| |
| # Clean up the temp storage of models |
| plpy.execute('drop table svm_temp_result'); |
| |
| return result; |
| |
| # ---------------------------------------------- |
| # Function to predict the labels of a data point |
| # ---------------------------------------------- |
| def svm_predict(model_table, ind): |
| """ |
| Scores a data point using a learned support vector model. |
| |
| @param model_table The table storing the learned model to be used |
| @param ind The data point to be scored |
| |
| """ |
| |
| nmodels_rec = plpy.execute("select count(distinct id) from " + model_table); |
| |
| if (nmodels_rec[0]['count'] <> 1): |
| plpy.error("Error in svm_predict(): the table contains an ensemble of models"); |
| |
| param_t = plpy.execute("select * from " + model_table + "_param") |
| intercept = param_t[0]['intercept'] |
| kernel_func = param_t[0]['kernel'] |
| |
| ret = plpy.execute("SELECT sum(weight * " + kernel_func + "(sv, array[" + str(ind)[1:-1] + "]::float8[])) + " + str(intercept) + " as sum FROM " + model_table); |
| if (ret.nrows() == 0): |
| plpy.error("Error executing svm_predict()"); |
| |
| return ret[0]['sum']; |
| |
| # ---------------------------------------------- |
| # Function to predict the labels of a data point |
| # ---------------------------------------------- |
| def svm_predict_combo( madlib_schema, model_table, ind): |
| """ |
| Scores a data point using an ensemble of support vector models. |
| |
| @param model_table The table storing the learned model to be used |
| @param ind The data point to be scored |
| |
| """ |
| |
| nmodels_rec = plpy.execute("select count(distinct id) from " + model_table); |
| nmodels = nmodels_rec[0]['count']; |
| |
| if (nmodels <= 1): |
| plpy.info("svm_predict_combo(): not an ensemble of models, will use a single prediction"); |
| pred = plpy.execute("select * from " + madlib_schema + ".svm_predict('" + model_table + "', array[" + str(ind)[1:-1] + "])") |
| return [( model_table, pred[0]['svm_predict'] )] |
| |
| sumpr = 0.0; |
| ret = []; |
| |
| for i in range(0,nmodels): |
| param_t = plpy.execute("select * from " + model_table + "_param where id = '" + model_table + str(i) + "'") |
| intercept = param_t[0]['intercept'] |
| kernel_func = param_t[0]['kernel'] |
| |
| pred = plpy.execute("select sum(weight * " + kernel_func + "(sv, array[" + str(ind)[1:-1] + "])) + " + str(intercept) + " as sum from " + model_table + " where id = '" + model_table + str(i) + "'"); |
| if (pred.nrows() == 0): |
| plpy.error("svm_predict_combo(): failed to compute prediction for model '" + model_table + str(i) + "'."); |
| |
| prediction = pred[0]['sum']; |
| sumpr = sumpr + prediction; |
| ret = ret + [(model_table + str(i), prediction)]; |
| |
| ret = ret + [('avg', sumpr/nmodels)]; |
| return ret; |
| |
| # --------------------------------------------------- |
| # Function to predict the labels of points in a table |
| # --------------------------------------------------- |
| def svm_predict_batch( input_table, data_col, id_col, model_table, output_table, parallel): |
| """ |
| Scores the data points stored in a table using a learned support vector model. |
| |
| @param input_table Name of table/view containing the data points to be scored |
| @param data_col Name of column in input_table containing the data points |
| @param id_col Name of column in input_table containing (integer) identifier for data point |
| @param model_table Name of learned model |
| @param output_table Name of table to store the results |
| @param parallel A flag indicating whether the system should learn multiple models in parallel |
| |
| """ |
| |
| plpy.execute('drop table if exists ' + output_table); |
| plpy.execute('create table ' + output_table + ' ( id int, prediction float8 ) m4_ifdef(`GREENPLUM', `distributed by (id)')'); |
| |
| if (parallel) : |
| num_models_t = plpy.execute('SELECT COUNT(DISTINCT(id)) n FROM ' + model_table + ' WHERE position(\'' + model_table + '\' in id) > 0 AND \'' + model_table + '\' <> id;'); |
| num_models = num_models_t[0]['n']; |
| |
| for i in range(0,num_models): |
| param_t = plpy.execute('SELECT * FROM ' + model_table + '_param WHERE id = \'' + model_table + str(i) + '\'') |
| intercept = param_t[0]['intercept'] |
| kernel_func = param_t[0]['kernel'] |
| sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' + kernel_func + '(m.sv, t.' + data_col + ')) + ' + str(intercept) + ' from ' + model_table + ' m, ' + input_table + ' t where m.id = \'' + model_table + str(i) + '\' group by 1)'; |
| plpy.execute(sql); |
| |
| else : |
| param_t = plpy.execute('SELECT * FROM ' + model_table + '_param'); |
| intercept = param_t[0]['intercept'] |
| kernel_func = param_t[0]['kernel'] |
| sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' + kernel_func + '(m.sv, t.' + data_col + ')) + ' + str(intercept) + ' from ' + model_table + ' m, ' + input_table + ' t where m.id = \'' + model_table + '\' group by 1)'; |
| plpy.execute(sql); |
| |
| return '''Finished processing data points in %s table; results are stored in %s table. |
| ''' % (input_table,output_table) |
| |