| #!/usr/bin/env python |
| |
| import plpy |
| from utilities.validate_args import table_exists |
| from utilities.validate_args import columns_exist_in_table |
| from utilities.validate_args import table_is_empty |
| from utilities.validate_args import cols_in_tbl_valid |
| from utilities.validate_args import input_tbl_valid |
| from utilities.validate_args import output_tbl_valid |
| from utilities.validate_args import regproc_valid |
| |
| def __svm_validate_parameters(madlib_schema, input_table, model_table, parallel, |
| kernel_func, verbose, eta, nu, slambda = 0.2, check_label=True): |
| """ |
| validate arguments |
| @param input_table Name of table/view containing the training data |
| @param model_table Name of table under which we want to store the learned model |
| @param parallel A flag indicating whether the system should learn multiple models |
| in parallel |
| @param kernel_func Kernel function |
| @param verbose Verbosity of reporting |
| @param eta Learning rate in (0,1] (default value is 0.1) |
| @param nu Compression parameter in (0,1] associated with the fraction of training |
| data that will become support vectors (default value is 0.005) |
| @param slambda Regularisation parameter (default value is 0.2) |
| """ |
| input_tbl_valid(input_table, 'SVM') |
| if check_label: |
| cols_in_tbl_valid(input_table, ['id', 'ind', 'label'], 'SVM') |
| else: |
| cols_in_tbl_valid(input_table, ['id', 'ind'], 'SVM') |
| |
| output_tbl_valid(model_table, 'SVM') |
| |
| if parallel is None: |
| plpy.error("SVM error: Argument 'parallel' should not be NULL!") |
| |
| if kernel_func is None or kernel_func == '': |
| plpy.error("SVM error: Invalid kernel function is provided!") |
| regproc_valid(kernel_func, "float8[], float8[], float8", 'SVM') |
| |
| if eta <= 0 or eta > 1: |
| plpy.error("SVM error: Learning rate eta should in range (0,1].") |
| |
| if nu <= 0 or nu > 1: |
| plpy.error("SVM error: Compression parameter nu should in range (0,1].") |
| |
| def __svm_predict_validate_parameters(input_table, data_col, id_col, model_table, output_table, parallel): |
| """ |
| @param input_table Name of table/view containing the data points to be scored |
| @param data_col Name of column in input_table containing the data points |
| @param id_col Name of column in input_table containing (integer) identifier for data point |
| @param model_table Name of learned model |
| @param output_table Name of table to store the results |
| @param parallel A flag indicating whether the system should learn multiple models in parallel |
| """ |
| input_tbl_valid(input_table, 'SVM') |
| cols_in_tbl_valid(input_table, [data_col, id_col], 'SVM') |
| |
| input_tbl_valid(model_table, 'SVM') |
| |
| output_tbl_valid(output_table, 'SVM') |
| |
| if parallel is None: |
| plpy.error("SVM error: Argument 'parallel' should not be NULL!") |
| |
| # ----------------------------------------------- |
| # Function to run the regression algorithm |
| # ----------------------------------------------- |
| def svm_regression(madlib_schema, input_table, model_table, parallel, kernel_func, |
| verbose = False, eta = 0.1, nu = 0.005, slambda = 0.2, |
| kernel_param = 1.0): |
| """ |
| Executes the support vector regression algorithm. |
| |
| @param input_table Name of table/view containing the training data |
| @param model_table Name of table under which we want to store the learned model |
| @param parallel A flag indicating whether the system should learn multiple |
| models in parallel |
| @param kernel_func Kernel function |
| @param verbose Verbosity of reporting |
| @param eta Learning rate in (0,1] (default value is 0.1) |
| @param nu Compression parameter in (0,1] associated with the fraction of |
| training data that will become support vectors (default value is 0.005) |
| @param slambda Regularisation parameter (default value is 0.2) |
| |
| """ |
| __svm_validate_parameters(madlib_schema, input_table, model_table, parallel, |
| kernel_func, verbose, eta, nu, slambda) |
| plpy.execute("create table " + model_table |
| + " ( id text, weight float8, sv float8[] ) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| plpy.execute("create table " + model_table |
| + "_param ( id text, intercept float8, kernel text, kernel_param float8) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| |
| plpy.execute("create temp table svm_temp_result ( id text, model " |
| + madlib_schema + ".svm_model_rec ) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| |
| if (verbose): |
| plpy.info("Parameters:") |
| plpy.info(" * input_table = %s" % input_table) |
| plpy.info(" * model_table = " + model_table) |
| plpy.info(" * parallel = " + str(parallel)) |
| plpy.info(" * kernel_func = " + kernel_func) |
| plpy.info(" * eta = " + str(eta)) |
| plpy.info(" * nu = " + str(nu)) |
| plpy.info(" * slambda = " + str(slambda)) |
| |
| if (parallel): |
| # Learning multiple models in parallel |
| |
| # Start learning process |
| sql = "insert into svm_temp_result (select '" + model_table \ |
| + "' || m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id'), " \ |
| + madlib_schema + ".svm_reg_agg(ind, label, '" + kernel_func \ |
| + "'," + str(eta) + "," + str(nu) + "," + str(slambda) \ |
| + "," + str(kernel_param) + ") from " + input_table + " group by 1)" |
| plpy.execute( sql) |
| |
| # Store the models learned |
| plpy.execute("insert into " + model_table |
| + "_param select id, (model).b, '" + kernel_func |
| + "', " + str(kernel_param) |
| + " from svm_temp_result") |
| __svm_store_model(madlib_schema, input_table, model_table, |
| 'svm_temp_result', model_table) |
| |
| else : |
| # Learning a single model |
| |
| # Start learning process |
| sql = "insert into svm_temp_result (select '" + model_table + "', " \ |
| + madlib_schema + ".svm_reg_agg(ind, label, '" + kernel_func \ |
| + "'," + str(eta) + "," + str(nu) + "," + str(slambda) \ |
| + "," + str(kernel_param) + ") from " + input_table + ")" |
| plpy.execute(sql) |
| # Store the model learned |
| plpy.execute("insert into " + model_table |
| + "_param select id, (model).b, '" + kernel_func |
| + "', " + str(kernel_param) |
| + " from svm_temp_result") |
| plpy.execute("select " + madlib_schema |
| + ".svm_store_model('svm_temp_result', '" |
| + model_table + "', '" + model_table + "')") |
| |
| # Retrieve and return the summary for each model learned |
| if parallel: |
| where_cond = "position('" + model_table + "' in id) > 0 AND '" \ |
| + model_table + "' <> id" |
| else: |
| where_cond = "id = '" + model_table + "'" |
| |
| summary = plpy.execute("select id, (model).inds, (model).cum_err, " |
| "(model).epsilon, (model).b, (model).nsvs from " |
| "svm_temp_result where " + where_cond) |
| |
| result = [] |
| for i in range(0,summary.nrows()): |
| result = result + [(model_table, summary[i]['id'], |
| summary[i]['inds'], summary[i]['cum_err'], |
| summary[i]['epsilon'], summary[i]['b'], |
| summary[i]['nsvs'])] |
| |
| # Clean up temp storage of models |
| plpy.execute('drop table svm_temp_result') |
| |
| return result |
| |
| # ----------------------------------------------- |
| # Function to run the classification algorithm |
| # ----------------------------------------------- |
| def svm_classification(madlib_schema, input_table, model_table, parallel, |
| kernel_func, verbose=False, eta=0.1, nu=0.005, |
| kernel_param=1.0): |
| """ |
| Executes the support vector classification algorithm. |
| |
| @param input_table Name of table/view containing the training data |
| @param model_table Name under which we want to store the learned model |
| @param parallel A flag indicating whether the system should learn |
| multiple models in parallel |
| @param kernel_func Kernel function |
| @param verbose Verbosity of reporting |
| @param eta Learning rate in (0,1] (default value is 0.1) |
| @param nu Compression parameter in (0,1] associated with the fraction of |
| training data that will become support vectors (default value is 0.005) |
| """ |
| __svm_validate_parameters(madlib_schema, input_table, model_table, parallel, |
| kernel_func, verbose, eta, nu) |
| plpy.execute("create table " + model_table |
| + " ( id text, weight float8, sv float8[] ) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| plpy.execute("create table " + model_table |
| + "_param ( id text, intercept float8, kernel text, kernel_param float8) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| |
| plpy.execute("create temp table svm_temp_result ( id text, model " |
| + madlib_schema + ".svm_model_rec ) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| |
| if (verbose): |
| plpy.info("Parameters:") |
| plpy.info(" * input_table = " + input_table) |
| plpy.info(" * model_table = " + model_table) |
| plpy.info(" * parallel = " + str(parallel)) |
| plpy.info(" * eta = " + str(eta)) |
| plpy.info(" * nu = " + str(nu)) |
| |
| if (parallel) : |
| # Learning multiple models in parallel |
| |
| # Start learning process |
| sql = "insert into svm_temp_result (select '" + model_table \ |
| + "' || m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id'), " \ |
| + madlib_schema + ".svm_cls_agg(ind, label,'" + kernel_func \ |
| + "'," + str(eta) + "," + str(nu) \ |
| + "," + str(kernel_param) + ") from " + input_table + " group by 1)" |
| |
| plpy.execute(sql) |
| |
| # Store the models learned |
| plpy.execute("insert into " + model_table |
| + "_param select id, (model).b, '" + kernel_func |
| + "', " + str(kernel_param) |
| + " from svm_temp_result") |
| __svm_store_model(madlib_schema, input_table, model_table, |
| 'svm_temp_result', model_table) |
| else : |
| # Learning a single model |
| |
| # Start learning process |
| sql = "insert into svm_temp_result (select '" + model_table + "', " \ |
| + madlib_schema + ".svm_cls_agg(ind, label,'" + kernel_func \ |
| + "'," + str(eta) + "," + str(nu) \ |
| + "," + str(kernel_param) + ") from " + input_table + ")" |
| plpy.execute(sql) |
| |
| # Store the model learned |
| plpy.execute("insert into " + model_table |
| + "_param select id, (model).b, '" + kernel_func |
| + "', " + str(kernel_param) |
| + " from svm_temp_result") |
| plpy.execute("select " + madlib_schema + ".svm_store_model('svm_temp_result', '" |
| + model_table + "', '" + model_table + "')") |
| |
| # Retrieve and return the summary for each model learned |
| if parallel: |
| where_cond = "position('" + model_table + "' in id) > 0 AND '" \ |
| + model_table + "' <> id" |
| else: |
| where_cond = "id = '" + model_table + "'" |
| |
| summary = plpy.execute("select id, (model).inds, (model).cum_err, (model).rho, " |
| "(model).b, (model).nsvs from svm_temp_result where " |
| + where_cond) |
| |
| result = [] |
| for i in range(0,summary.nrows()): |
| result = result + [(model_table, summary[i]['id'], summary[i]['inds'], |
| summary[i]['cum_err'], summary[i]['rho'], |
| summary[i]['b'], summary[i]['nsvs'])] |
| |
| # Clean up temp storage of models |
| plpy.execute('drop table svm_temp_result') |
| |
| return result |
| |
| # ----------------------------------------------- |
| # Function to run the novelty detection algorithm |
| # ----------------------------------------------- |
| def svm_novelty_detection(madlib_schema, input_table, model_table, parallel, |
| kernel_func, verbose=False, eta = 0.1, nu = 0.01, |
| kernel_param = 1.0): |
| """ |
| Executes the support vector novelty detection algorithm. |
| |
| @param input_table Name of table/view containing the training data |
| @param model_table Name of table under which we want to store the learned model |
| @param parallel A flag indicating whether the system should learn multiple |
| models in parallel. |
| @param kernel_func Kernel function |
| @param verbose Verbosity of reporting (default value is False) |
| @param eta Learning rate in (0,1] (default value is 0.1) |
| @param nu Compression parameter in (0,1] associated with the fraction of training |
| data that will become support vectors (default value is 0.01) |
| """ |
| __svm_validate_parameters(madlib_schema, input_table, model_table, |
| parallel, kernel_func, verbose, eta, nu, check_label=False) |
| if verbose is None: |
| verbose = False |
| if eta is None: |
| eta = .1 |
| if nu is None: |
| nu = .01 |
| |
| plpy.execute("create table " + model_table + " ( id text, weight float8, sv float8[] ) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| plpy.execute("create table " + model_table |
| + "_param ( id text, intercept float8, kernel text, kernel_param float8) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| |
| plpy.execute("create temp table svm_temp_result ( id text, model " + madlib_schema |
| + ".svm_model_rec ) m4_ifdef(`__POSTGRESQL__', `', `distributed randomly')") |
| |
| if (verbose): |
| plpy.info("Parameters:") |
| plpy.info(" * input_table = " + input_table) |
| plpy.info(" * model_table = " + model_table) |
| plpy.info(" * parallel = " + str(parallel)) |
| plpy.info(" * eta = " + str(eta)) |
| plpy.info(" * nu = " + str(nu)) |
| |
| if (parallel) : |
| # Learning multiple models in parallel |
| |
| # Start learning process |
| sql = "insert into svm_temp_result (select '" + model_table \ |
| + "' || m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id'), " \ |
| + madlib_schema + ".svm_nd_agg(ind,'" + kernel_func + "'," \ |
| + str(eta) + "," + str(nu) \ |
| + "," + str(kernel_param) + ") from " + input_table + " group by 1)" |
| plpy.execute(sql) |
| |
| # Store the models learned |
| plpy.execute("insert into " + model_table |
| + "_param select id, (model).rho * -1.0, '" + kernel_func |
| + "', " + str(kernel_param) |
| + " from svm_temp_result") |
| __svm_store_model(madlib_schema, input_table, model_table, |
| 'svm_temp_result', model_table) |
| |
| else : |
| # Learning a single model |
| |
| # Start learning process |
| sql = "insert into svm_temp_result (select '" + model_table \ |
| + "', " + madlib_schema + ".svm_nd_agg(ind,'" + kernel_func \ |
| + "'," + str(eta) + "," + str(nu) \ |
| + "," + str(kernel_param) + ") from " + input_table + ")" |
| plpy.execute(sql) |
| |
| # Store the model learned |
| plpy.execute("insert into " + model_table |
| + "_param select id, (model).rho * -1.0, '" + kernel_func |
| + "', " + str(kernel_param) |
| + " from svm_temp_result") |
| plpy.execute("select " + madlib_schema |
| + ".svm_store_model('svm_temp_result', '" |
| + model_table + "', '" + model_table + "')") |
| |
| # Retrieve and return the summary for each model learned |
| if parallel: |
| where_cond = "position('" + model_table + "' in id) > 0 AND '" \ |
| + model_table + "' <> id" |
| else: |
| where_cond = "id = '" + model_table + "'" |
| |
| summary = plpy.execute("select id, (model).inds, (model).rho, " |
| "(model).nsvs from svm_temp_result where " + where_cond) |
| |
| result = [] |
| for i in range(0,summary.nrows()): |
| result = result + [(model_table, summary[i]['id'], summary[i]['inds'], |
| summary[i]['rho'], summary[i]['nsvs'])] |
| |
| # Clean up the temp storage of models |
| plpy.execute('drop table svm_temp_result') |
| |
| return result |
| |
| # --------------------------------------------------- |
| # Function to predict the labels of points in a table |
| # --------------------------------------------------- |
| def svm_predict_batch( input_table, data_col, id_col, model_table, output_table, parallel): |
| """ |
| Scores the data points stored in a table using a learned support vector model. |
| |
| @param input_table Name of table/view containing the data points to be scored |
| @param data_col Name of column in input_table containing the data points |
| @param id_col Name of column in input_table containing (integer) identifier for data point |
| @param model_table Name of learned model |
| @param output_table Name of table to store the results |
| @param parallel A flag indicating whether the system should learn multiple models in parallel |
| |
| """ |
| __svm_predict_validate_parameters(input_table, data_col, id_col, model_table, output_table, parallel) |
| |
| #plpy.execute('drop table if exists ' + output_table) |
| plpy.execute("create table " + output_table + " ( id int, prediction float8 ) " |
| "m4_ifdef(`__POSTGRESQL__', `', `distributed by (id)')") |
| |
| if (parallel) : |
| model_ids_t = plpy.execute("SELECT DISTINCT(id) model_id FROM " + model_table |
| + " WHERE position('" + model_table |
| + "' in id) > 0 AND '" + model_table + "\' <> id;") |
| num_models = len(model_ids_t) |
| |
| for i in range(0,num_models): |
| param_t = plpy.execute("SELECT * FROM " + model_table + "_param WHERE id = '" |
| + model_ids_t[i]['model_id'] + "'") |
| intercept = param_t[0]['intercept'] |
| kernel_func = param_t[0]['kernel'] |
| kernel_param = param_t[0]['kernel_param'] |
| sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' \ |
| + kernel_func + '(m.sv, t.' + data_col + \ |
| ', ' + str(kernel_param) + '::float8)) + ' \ |
| + str(intercept) \ |
| + ' from ' + model_table + ' m, ' + input_table + ' t where m.id = \'' \ |
| + model_ids_t[i]['model_id'] + '\' group by 1)' |
| plpy.execute(sql) |
| |
| else : |
| param_t = plpy.execute('SELECT * FROM ' + model_table + '_param') |
| intercept = param_t[0]['intercept'] |
| kernel_func = param_t[0]['kernel'] |
| kernel_param = param_t[0]['kernel_param'] |
| sql = 'insert into ' + output_table + '(select t.' + id_col + ', sum(weight * ' \ |
| + kernel_func + '(m.sv, t.' + data_col + \ |
| ', ' + str(kernel_param) + '::float8)) + ' \ |
| + str(intercept) + ' from ' \ |
| + model_table + ' m, ' + input_table + ' t where m.id = \'' + model_table \ |
| + '\' group by 1)' |
| plpy.execute(sql) |
| |
| return '''Finished processing data points in %s table; results are stored in %s table. |
| ''' % (input_table,output_table) |
| |
| # ------------------------------------------------------------------------------ |
| # This function stores a collection of models learned in parallel into the model_table. |
| # The different models are stored in model_temp_table and are assumed to be named model_table1, model_table2, .... |
| # ------------------------------------------------------------------------------ |
| def __svm_store_model(madlib_schema, input_table, model_table, model_temp_table, model_name): |
| # Store the models learned |
| numproc_t = plpy.execute("select distinct(m4_ifdef(`__POSTGRESQL__', `0', `gp_segment_id')) " |
| "AS seg_id from " + input_table) |
| res_len = len(numproc_t) |
| for i in range(0,res_len): |
| seg_id = numproc_t[i]['seg_id'] |
| plpy.execute("select "+madlib_schema + ".svm_store_model('" + model_temp_table + |
| "', '" + model_name + str(seg_id) + "', '" + model_table + "')") |