blob: f1fd0b99539f29e4fe98933a9673bd35dbd1e4d3 [file] [log] [blame]
from utilities.control import MinWarning
from utilities.utilities import unique_string
from validation.cv_utils import __cv_copy_data_with_id
from validation.cv_utils import __cv_split_data_using_id_col
from validation.cv_utils import __cv_split_data_using_id_tbl
from validation.cv_utils import __cv_summarize_result
from validation.cv_utils import __cv_generate_random_id
from utilities.utilities import __mad_version
from utilities.utilities import preprocess_keyvalue_params
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
from utilities.validate_args import get_cols
import plpy
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
def __cv_combine_params_type_general(params, params_type, tbl_data,
col_random_id, param_explored,
explore_value, tbl_input, tbl_output,
grp_to_param_tbl=None,
add_param_quotes=True):
"""
Create argument list for SQL functions for training, validation and metric measuring
Args:
@param params A string list of parameter names
@param params_type A string list of parameter types
@param tbl_data Data table name (training, validation)
@param col_random_id The random ID column, a string
@param param_explored The name that CV runs through
@param explore_value The current value of param_explored that is under study, already converted to string
@param tbl_input The input table name for SQL function
@param tbl_output The output table name for SQL function
Returns:
Output is a string, where all parameters are force-casted into their type.
"""
if len(params) != len(params_type):
plpy.error("CV error: The number of parameters ({0}) "
"should be equal to the number of types ({1})!".
format(params, params_type))
rst = []
# The special keywords
# opts = set(["%data%", "%id%", "%error%", "%model%", "%prediction%",
# "%explore%", param_explored])
for i, p in enumerate(params):
if not p or p.upper() == 'NULL':
rst.append("NULL")
else:
p = p.strip()
p_type = params_type[i].strip('"')
if p == "%data%":
rst.append("\'" + tbl_data + "\'::" + p_type)
elif p == "%id%":
rst.append("\'" + col_random_id + "\'::" + p_type)
elif p == "%error%":
rst.append("\'" + tbl_output + "\'::" + p_type)
elif p == "%model%":
if tbl_input is None:
rst.append("\'" + tbl_output + "\'::" + p_type)
else:
rst.append("\'" + tbl_input + "\'::" + p_type)
elif p == "%prediction%":
if "%error%" in set(params):
rst.append("\'" + tbl_input + "\'::" + p_type)
else:
rst.append("\'" + tbl_output + "\'::" + p_type)
elif p == "%group_param_tbl%":
rst.append("\'" + tbl_output + "\'::" + p_type)
else:
if add_param_quotes:
p = "'{0}'".format(p)
rst.append(p + "::" + p_type)
return ','.join(rst)
# ------------------------------------------------------------------------
def _replace_explore(params, param_explored, explore_value):
"""
Args:
@param arg
Returns:
"""
for i, p in enumerate(params):
if p == param_explored or p is not None and "%explore%" in p:
if "=" in p:
# p is a string with key-value pairs, each pair separated by ','
pairs = preprocess_keyvalue_params(p)
param_list = []
for each_pair in pairs:
key_value = [i.strip() for i in each_pair.split("=")]
if key_value[1] == "%explore%":
param_list.append(str(key_value[0]) + "=" +
str(explore_value))
else:
param_list.append(each_pair)
params[i] = ','.join(param_list)
else:
params[i] = str(explore_value)
# ----------------------------------------------------------------------
def __cv_funcall_general(func, params, params_type, tbl_data, col_random_id,
param_explored, explore_value, tbl_input,
tbl_output, add_param_quotes=True):
"""
Call training, validation or metric measuring function
@param func The name of the SQL function to be called, a string
For all other parameters, see the doc string of __cv_combine_params_type_general
"""
arg_string = __cv_combine_params_type_general(
params, params_type, tbl_data, col_random_id,
param_explored, explore_value, tbl_input, tbl_output,
add_param_quotes=add_param_quotes)
sql = "SELECT {func}({arg_string})".format(func=func, arg_string=arg_string)
plpy.execute(sql)
# ------------------------------------------------------------------------
def __cv_param_type_explored(params, params_type, param_explored):
"""
Find the type of exploring parameter
@param params A string list of parameter names
@param params_type A string list of parameter types
@param param_explored The name that CV runs through
"""
for i in range(len(params)):
if params[i] == param_explored:
return params_type[i]
return None
# ------------------------------------------------------------------------
def _validate_cv_args(schema_madlib, modelling_func, modelling_params,
modelling_params_type, param_explored,
predict_func, predict_params, predict_params_type,
metric_func, metric_params, metric_params_type,
data_tbl, data_id, id_is_random,
validation_result, n_folds, data_cols, **kwargs):
if any(arg is None for arg in
(modelling_func, modelling_params, modelling_params_type,
predict_func, predict_params, predict_params_type,
metric_func, metric_params, metric_params_type, data_tbl,
validation_result, n_folds)):
plpy.error("CV error: You have unsupported Null value(s) in arguments!")
if not table_exists(data_tbl):
plpy.error("CV error: No data table!")
if table_exists(validation_result, only_first_schema=True):
plpy.error("CV error: Output table already exists!")
if not columns_exist_in_table(data_tbl, data_cols, schema_madlib):
plpy.error("CV error: Data columns do not exist!")
if data_id is not None and id_is_random is None:
plpy.error("CV error: Is the data row ID random?")
if n_folds <= 1:
plpy.error("CV error: Cross validation total fold number should "
"be larger than 1!")
n_rows = plpy.execute("SELECT count(*) AS n_rows FROM " + data_tbl
)[0]["n_rows"]
if n_rows <= n_folds:
plpy.error("CV error: Number of rows less than number of folds")
return n_rows
# ------------------------------------------------------------------------
def _create_data_tbl_id(data_tbl, data_cols, tbl_all_data, data_id,
id_is_random, col_random_id, tbl_random_id, **kwargs):
""" Ensure that the data table has a random id column by one of the following:
1. creating a copy of the data table with a random id added in
2. creating a mapping from existing id to a random id
3. use original table if it already has a random id
"""
if data_id is None:
# unique ID column is not given, has to copy the data and create the ID
__cv_copy_data_with_id(data_tbl, data_cols, tbl_all_data, col_random_id)
tbl_used = tbl_all_data
elif id_is_random:
# unique ID column is given and is random
tbl_used = data_tbl
col_random_id = data_id
else:
# the provided unique ID is not random, create a table
# mapping the given ID to a random ID
__cv_generate_random_id(data_tbl, data_id, tbl_random_id, col_random_id)
tbl_used = data_tbl
return tbl_used, col_random_id
# ----------------------------------------------------------------------
def _one_step_cv(tbl_output_model, tbl_output_pred, tbl_output_error, tbl_accum_error,
col_random_id, temp_param_explored, explore_value,
output_created, explore_type_str, data_id,
modelling_func, modelling_params, modelling_params_type, tbl_train,
predict_func, predict_params, predict_params_type, tbl_valid,
metric_func, metric_params, metric_params_type, k=0, append_k=False,
use_existing_tables=False, add_param_quotes=True, **kwargs):
"""
Args:
@param schema_madlib
Returns:
"""
# Some modules, such as decision tree,
# support appending the results from different cv folds
# to one single table to improve the performance,
# while most of other modules create its own
# output tables in each cv fold.
if not use_existing_tables:
plpy.execute("""
DROP TABLE IF EXISTS {tbl_output_model};
DROP TABLE IF EXISTS {tbl_output_model}_summary;
DROP TABLE IF EXISTS {tbl_output_pred};
DROP TABLE IF EXISTS {tbl_output_error}
""".format(**locals()))
# In case the functions need cv fold number k
# For example, in decision tree cv, we keep append
# results to the same tables and use k to distinguish
# them.
if append_k:
# We do not want to change the original values
# that are passed into this function, so '+='
# cannot be used here. Otherwise, every call to
# this function would append a new item to the
# lists. (modelling_params += [str(k)] is wrong).
modelling_params = modelling_params + [str(k)]
modelling_params_type = modelling_params_type + ['INTEGER']
predict_params = predict_params + [str(k)]
predict_params_type = predict_params_type + ['INTEGER']
metric_params = metric_params + [str(k)]
metric_params_type = metric_params_type + ['INTEGER']
use_fold = 'WHERE k = ' + str(k)
else:
use_fold = ''
# train
__cv_funcall_general(
modelling_func, modelling_params, modelling_params_type,
tbl_train, col_random_id, temp_param_explored,
explore_value, None, tbl_output_model,
add_param_quotes=add_param_quotes)
# validation
__cv_funcall_general(
predict_func, predict_params, predict_params_type,
tbl_valid, col_random_id, temp_param_explored,
explore_value, tbl_output_model, tbl_output_pred,
add_param_quotes=add_param_quotes)
# measure the performance metric
__cv_funcall_general(
metric_func, metric_params, metric_params_type,
tbl_valid, col_random_id, temp_param_explored,
explore_value, tbl_output_pred, tbl_output_error,
add_param_quotes=add_param_quotes)
# accumulate the measured metric result
if not output_created:
plpy.execute("""
DROP TABLE IF EXISTS {tbl_accum_error};
CREATE TEMP TABLE {tbl_accum_error} as
SELECT
({explore_value}){explore_type_str} as {temp_param_explored},
{tbl_output_error}.*
FROM {tbl_output_error}
{use_fold}
""".format(**locals()))
output_created = True
else:
plpy.execute("""
INSERT INTO {tbl_accum_error}
SELECT
({explore_value}){explore_type_str} as {temp_param_explored},
{tbl_output_error}.*
FROM {tbl_output_error}
{use_fold}
""".format(**locals()))
return output_created
# ----------------------------------------------------------------------
# perform cross validation
def cross_validation_general(
schema_madlib,
modelling_func, modelling_params, modelling_params_type,
param_explored, explore_values,
predict_func, predict_params, predict_params_type,
metric_func, metric_params, metric_params_type,
data_tbl, data_id, id_is_random,
validation_result, data_cols, n_folds, **kwargs):
"""
A general framework that runs cross-validation for modules that are
consistent with certain SQL API.
See dev-doc for details.
"""
with MinWarning("warning"):
if not data_cols:
data_cols = get_cols(data_tbl)
n_rows = _validate_cv_args(**locals())
explore_type = __cv_param_type_explored(modelling_params,
modelling_params_type,
param_explored)
explore_type_str = "" if not explore_type else "::" + str(explore_type)
# all temporary names
tbl_all_data = unique_string(desp='tbl_all_data')
tbl_train = unique_string(desp='tbl_train')
tbl_valid = unique_string(desp='tbl_valid')
col_random_id = unique_string(desp='col_random_id')
tbl_random_id = unique_string(desp='tbl_random_id')
tbl_output_model = "pg_temp." + unique_string(desp='output_model')
tbl_output_pred = "pg_temp." + unique_string(desp='output_pred')
tbl_output_error = "pg_temp." + unique_string(desp='output_error')
tbl_accum_error = unique_string(desp='accum_error')
tbl_used, col_random_id = _create_data_tbl_id(**locals())
# try to be as general as possible
# validation using each explore_value
if not explore_values:
# if no parameter needs to be explored
# just do the data folding
temp_param_explored = unique_string()
temp_explore_values = [-1]
data_folding_only = True
else:
temp_param_explored = param_explored
temp_explore_values = explore_values
data_folding_only = False
output_created = False
modelling_params_orig = modelling_params
for k in range(n_folds):
# split data into train and validation parts
if not data_id or id_is_random:
__cv_split_data_using_id_col(tbl_used, data_cols, col_random_id,
n_rows, tbl_train, tbl_valid,
n_folds, k + 1)
else:
__cv_split_data_using_id_tbl(tbl_used, data_cols, tbl_random_id,
col_random_id, data_id, n_rows,
tbl_train, tbl_valid, n_folds, k + 1)
for explore_value in temp_explore_values:
modelling_params = modelling_params_orig[:]
_replace_explore(modelling_params, param_explored, explore_value)
output_created = _one_step_cv(**locals())
# compute the averages and standard deviations for all measured metrics (not necessary one)
__cv_summarize_result(tbl_accum_error, validation_result,
temp_param_explored, data_folding_only,
schema_madlib)
# clean up the temporary tables
plpy.execute("""
DROP TABLE IF EXISTS grp_key_to_cp;
DROP TABLE IF EXISTS {tbl_all_data};
DROP TABLE IF EXISTS {tbl_train};
DROP TABLE IF EXISTS {tbl_valid};
DROP TABLE IF EXISTS {tbl_random_id};
DROP TABLE IF EXISTS {tbl_output_model};
DROP TABLE IF EXISTS {tbl_output_pred};
DROP TABLE IF EXISTS {tbl_output_error};
DROP TABLE IF EXISTS {tbl_accum_error};
""".format(**locals()))
return None
# ------------------------------------------------------------------------
# XXX Currently, this function is built specifically for the decision tree,
# which appends the outputs from different cv folds to a single
# table to reduce catalog changes and thus improve the performance.
# FIXME: this should either be made general enough or moved to decision_tree.py_in
def cross_validation_grouping_w_params(
schema_madlib,
modelling_func, modelling_params, modelling_params_type,
predict_func, predict_params, predict_params_type,
metric_func, metric_params, metric_params_type,
group_to_param_list_table, param_list_name, grouping_cols,
data_tbl, data_id, id_is_random, validation_result, param_explored,
data_cols, n_folds, **kwargs):
"""
A general framework that runs cross-validation for modules that are
consistent with certain SQL API.
See dev-doc for details.
"""
with MinWarning("warning"):
if not data_cols:
data_cols = get_cols(data_tbl)
n_rows = _validate_cv_args(**locals())
explore_type_str = "::INTEGER"
# all temporary names
tbl_all_data = unique_string()
tbl_train = unique_string()
tbl_valid = unique_string()
col_random_id = unique_string()
tbl_random_id = unique_string()
tbl_output_model = "pg_temp." + unique_string()
tbl_output_pred = "pg_temp." + unique_string()
tbl_output_error = "pg_temp." + unique_string()
tbl_accum_error = unique_string()
grp_to_param_tbl = unique_string()
temp_param_explored = unique_string()
tbl_used, col_random_id = _create_data_tbl_id(**locals())
output_created = False
grouping_col_str = "" if not grouping_cols else grouping_cols + ','
plpy.execute("DROP TABLE IF EXISTS {0}".format(grp_to_param_tbl))
plpy.execute("""CREATE TEMP TABLE {tbl} AS
SELECT {grouping_col_str}
{param_list_name}[1] as explore_value
FROM {group_to_param_list_table}
LIMIT 0
""".format(tbl=grp_to_param_tbl,
grouping_col_str=grouping_col_str,
param_list_name=param_list_name,
group_to_param_list_table=group_to_param_list_table))
_replace_explore(modelling_params, unique_string(),
"'" + grp_to_param_tbl + "'")
# get the maximum size of the param list among all groups
max_len = plpy.execute("""
SELECT max(array_upper({0}, 1)) as max_len
FROM {1}
""".format(param_list_name, group_to_param_list_table))[0]["max_len"]
for k in range(n_folds):
# split data into train and validation parts
if not data_id or id_is_random:
__cv_split_data_using_id_col(tbl_used, data_cols, col_random_id,
n_rows, tbl_train, tbl_valid,
n_folds, k + 1)
else:
__cv_split_data_using_id_tbl(tbl_used, data_cols, tbl_random_id,
col_random_id, data_id, n_rows,
tbl_train, tbl_valid, n_folds, k + 1)
for explore_value in range(1, max_len + 1):
plpy.execute("TRUNCATE TABLE {0}".format(grp_to_param_tbl))
query = """
INSERT INTO {grp_to_param_tbl}
SELECT {grouping_col_str}
(CASE WHEN {explore_value} <= array_upper({param_list_name}, 1)
THEN {param_list_name}[{explore_value}]
ELSE {param_list_name}[array_upper({param_list_name}, 1)]
END) as explore_value
FROM {group_to_param_list_table}
""".format(**locals())
plpy.execute(query)
# XXX Decision tree uses existing tables to store the outputs
# from all cv folds.
use_existing_tables = True
# XXX To reduce the number of truncations on the existing tables
# including the model table and error table, we just append new
# results to them together with the fold ID.
# XXX The prediction table is to big and we have to use table truncation
# to speed up the search in the prediction table.
append_k = True # k is fold ID to distinguish different sets of results.
# XXX Important to not add single quotes around param
# as a special-handling for decision trees
add_param_quotes = False
output_created = _one_step_cv(**locals())
# compute the averages and standard deviations of cv_error for each group and explore value
group_using_str = ('' if not grouping_cols
else ("USING (" + grouping_cols + ")"))
sql_create_validation_result = """
CREATE TABLE {validation_result} AS
SELECT
{grouping_col_str}
{param_list_name}[{temp_param_explored}] AS {param_explored},
avg(cv_error) AS cv_error_avg,
stddev(cv_error) AS cv_error_stddev
FROM
{tbl_accum_error}
NATURAL JOIN
{group_to_param_list_table}
WHERE {temp_param_explored} <= array_upper({param_list_name}, 1)
GROUP BY {grouping_col_str} {temp_param_explored}, {param_list_name}
""".format(**locals())
plpy.execute(sql_create_validation_result)
plpy.execute("""
DROP TABLE IF EXISTS {grp_to_param_tbl};
DROP TABLE IF EXISTS {tbl_all_data};
DROP TABLE IF EXISTS {tbl_train};
DROP TABLE IF EXISTS {tbl_valid};
DROP TABLE IF EXISTS {tbl_random_id};
DROP TABLE IF EXISTS {tbl_output_model};
DROP TABLE IF EXISTS {tbl_output_pred};
DROP TABLE IF EXISTS {tbl_output_error};
DROP TABLE IF EXISTS {tbl_accum_error};
""".format(**locals()))
return None
# ----------------------------------------------------------------------
def cv_linregr_train(schema_madlib, tbl_source, col_ind_var,
col_dep_var, tbl_result, **kwargs):
"""
A wrapper for linear regression to be used in general CV
"""
plpy.execute("""
create table {tbl_result} as
select ({schema_madlib}.linregr({col_dep_var}, {col_ind_var})).*
from {tbl_source}
""".format(schema_madlib=schema_madlib,
tbl_source=tbl_source,
tbl_result=tbl_result,
col_ind_var=col_ind_var,
col_dep_var=col_dep_var))
return None
# ------------------------------------------------------------------------
def cv_linregr_predict(schema_madlib, tbl_model, tbl_newdata, col_ind_var,
col_id, tbl_predict, **kwargs):
"""
A wrapper function for linear prediction for CV
"""
plpy.execute("""
create table {tbl_predict} as
select
{col_id},
{schema_madlib}.linregr_predict(
coef,
{col_ind_var}) as prediction
from
{tbl_model}, {tbl_newdata}
""".format(**locals()))
# ------------------------------------------------------------------------
def cv_logregr_predict(schema_madlib, tbl_model, tbl_newdata, col_ind_var,
col_id, tbl_predict, **kwargs):
"""
A wrapper function for logistic prediction for CV
"""
plpy.execute("""
create table {tbl_predict} as
select
{col_id},
{schema_madlib}.logregr_predict(
coef,
{col_ind_var}) as prediction
from
{tbl_model}, {tbl_newdata}
""".format(**locals()))
# ------------------------------------------------------------------------
def cv_logregr_accuracy(schema_madlib, tbl_predict, tbl_source, col_id,
col_dep_var, tbl_accuracy, **kwargs):
"""
A wrapper function for logistic accuracy for CV
"""
plpy.execute("""
create table {tbl_accuracy} as
select
avg(case when
{tbl_predict}.prediction = {tbl_source}.{col_dep_var}
then 1
else 0
end) as accuracy
from
{tbl_predict}, {tbl_source}
where {tbl_predict}.{col_id} = {tbl_source}.{col_id}
""".format(**locals()))