blob: b6eb60148aafd2a00adbaef88eea98563a49369f [file] [log] [blame]
import math
import plpy
from utilities.utilities import __mad_version
## ========================================================================
version_wrapper = __mad_version()
mad_vec = version_wrapper.select_vecfunc()
## ========================================================================
def __cv_produce_col_name_string(tbl, cols, exclude_cols=None):
"""
Given an array of strings, pick out the column names and form a single
string, so that we could select only the necessary data when copying is
inevitable.
@param tbl Name of the table that contains the columns
@param cols A string list of column names
"""
if not exclude_cols:
exclude_cols = []
rst = [tbl + "." + col for col in cols if col not in exclude_cols]
return ','.join(rst)
## ========================================================================
def __cv_copy_data_with_id(rel_origin, col_data, rel_copied, random_id):
"""
If the user does not provide a ID column, the data table has to be
copied and at the same time create a random ID column with it.
@param rel_origin Original table name, a string
@param col_data A string list of columns that contain needed data
@param rel_copied Name of the table that copies the data from the original table
@param random_id Column name for random unqiue ID in the newly created copied table
"""
# We want to select only the columns that will be used in the computation.
col_string = __cv_produce_col_name_string(rel_origin, col_data)
__cv_copy_data_with_id_compute(rel_origin, col_string, rel_copied, random_id)
return None
## ========================================================================
def __cv_copy_data_with_id_compute(rel_origin, col_string, rel_copied, random_id):
plpy.execute("""
select setseed(0.5);
drop table if exists {rel_copied};
create temp table {rel_copied} as
select
row_number() over (order by random()) as {random_id},
{col_string}
from {rel_origin}
""".format(rel_origin=rel_origin, rel_copied=rel_copied,
random_id=random_id, col_string=col_string))
return None
## ========================================================================
def __cv_summarize_result(tbl_accum_error, tbl_result, param_explored,
data_folding_only, schema_madlib):
"""
Summarize the result. For each column in the metric measured in cross validation,
compute the average and standard deviation for the metrics. For ridge and lasso,
there is only one error column, but for other modules, there might be multiple
metric values. "_avg" and "_stddev" will be appended to each generated column.
@param tbl_accum_error Name of the table that accumulates all measured metric in CV
@param tbl_result Name of the table that will store the summarized result
@param param_explored Name of the parameter that CV runs through, a string
"""
agg = version_wrapper.select_array_agg(schema_madlib)
cols = plpy.execute("""
select {agg}(column_name::varchar) as col
from information_schema.columns
where table_name = '{tbl_accum_error}'
""".format(tbl_accum_error=tbl_accum_error,
agg=agg))[0]["col"]
cols = mad_vec(cols)
kwargs = dict(tbl_accum_error=tbl_accum_error,
tbl_result=tbl_result,
param_explored=param_explored)
ecols = []
for i in range(len(cols)):
if cols[i] != param_explored:
ecols.append(cols[i])
col_string = "{param_explored}, ".format(**kwargs)
for i in range(len(ecols)):
col_string += "avg({col}) as {col}_avg, stddev({col}) as {col}_stddev".format(col=ecols[i])
if i != len(ecols) - 1:
col_string += ", "
plpy.execute("""
drop table if exists {tbl_result};
create table {tbl_result} as
select {col_string}
from {tbl_accum_error}
group by {param_explored}
order by {param_explored}
""".format(col_string=col_string, **kwargs))
if data_folding_only:
plpy.execute("alter table " + tbl_result + " drop column " + param_explored)
return None
## ========================================================================
def __cv_generate_random_id(tbl_origin, col_id, tbl_random_id, random_id):
"""
Create an random ID column for a given data table.
If the user provides an ID column, which can uniquely identify the rows,
create a table which maps the row ID to a random integer, which is
then used as the ID when splitting data.
If the ID provided by user is already random, then there is no need to
call this function.
@param tbl_origin Name of the original data table
@param col_id Name of the original unique ID column
@param tbl_random_id Name of the table that maps the original non-random ID to
a random ID. It has two columns, one for the original ID
and the other for the newly created random ID.
@param random_id Name of the column that contains the newly created random ID
@param origin_id Name of the column that contains the origin ID
"""
plpy.execute("""
drop table if exists {tbl_random_id};
create temp table {tbl_random_id} as
select
row_number() over (order by random()) as {random_id},
{col_id}
from {tbl_origin}
""".format(tbl_origin=tbl_origin,
col_id=col_id,
tbl_random_id=tbl_random_id,
random_id=random_id))
return None
## ========================================================================
def __cv_validation_rows(row_num, n_folds, which_fold):
"""
Compute the start and ending rows of each validation fold.
A validation data set would correspond to the asymmetric range [start_row, end_row)
Since the ID column is already random, a consecutive part of the ID is
enough to extract a good sample of validation data.
To ensure even-sized folds, the fold sizes are computed as:
Say there are N instances and k folds, then
r = N mod k (the remainder of N divided by k)
is the number of surplus records.
The first r folds will have floor(N/k) + 1 records,
the other ones just floor(N/k)
"""
r = row_num % n_folds
fold_size = int(math.floor(row_num * 1.0 / n_folds))
if which_fold > r:
start_row = r * (fold_size + 1) + (which_fold - r - 1) * fold_size + 1
end_row = start_row + fold_size
else:
start_row = (which_fold - 1) * (fold_size + 1) + 1
end_row = start_row + (fold_size + 1)
return (start_row, end_row)
## ========================================================================
def __cv_split_data_using_id_col(rel_source, col_data, col_id, row_num,
rel_train, rel_valid, n_folds, which_fold):
"""
A random ID column exists (originally exists or was created during copying),
split the data into training and validation.
@param rel_source Name of data source table
@param col_data A string list of data columns
@param col_id Name of the unique ID column
@param row_num Total number of rows
@param rel_train Name of training data table
@param rel_valid Name of validation data table
@param n_folds How many fold cross validation
@param which_fold Which fold will be used as validation part?
"""
col_string = __cv_produce_col_name_string(rel_source, col_data, [col_id])
__cv_split_data_using_id_col_compute(rel_source, col_string, col_id, row_num,
rel_train, rel_valid, n_folds, which_fold)
return None
# ========================================================================
def __cv_split_data_using_id_col_compute(rel_source, col_string, col_id, row_num,
rel_train, rel_valid, n_folds, which_fold):
(start_row, end_row) = __cv_validation_rows(row_num, n_folds, which_fold)
kwargs = dict(rel_train=rel_train, rel_source=rel_source,
col_id=col_id, start_row=start_row,
rel_valid=rel_valid,
end_row=end_row, col_string=col_string)
# Extract the training part of data,
# which corresponds to rows outside of [start_row, end_row).
# Extract the validation part of data,
# which corresponds to rows inside of [start_row, end_row).
sql = """
drop table if exists {rel_train};
create temp table {rel_train} as
select {col_id}, {col_string} from {rel_source}
where {col_id} < {start_row}
or {col_id} >= {end_row};
drop table if exists {rel_valid};
create temp table {rel_valid} as
select {col_id}, {col_string} from {rel_source}
where {col_id} >= {start_row}
and {col_id} < {end_row}
""".format(**kwargs)
plpy.execute(sql)
return None
## ========================================================================
def __cv_split_data_using_id_tbl(rel_origin, col_data, rel_random_id,
random_id, origin_id, row_num, rel_train,
rel_valid, n_folds, which_fold):
"""
Split the data table using a random ID mapping table
A unique ID column exists in the original table, but it is not randomly assigned.
Thus a table that maps this non-random ID to a real random ID has been created by
__cv_generate_random_id.
@param rel_origin Name of the original data table
@param col_data A string list of data columns
@param rel_random_id Name of the random ID mapping table
@param random_id Name of random ID column in the table rel_random_id
@param origin_id Name of the original non-random ID column in rel_origin and rel_random_id
@param row_num Total number of rows
@param rel_train Name of training data table
@param rel_valid Name of validation data table
@param n_folds How many fold cross validation
@param which_fold Which fold will be used as validation part?
"""
col_string = __cv_produce_col_name_string(rel_origin, col_data)
__cv_split_data_using_id_tbl_compute(rel_origin, col_string, rel_random_id,
random_id, origin_id, row_num, rel_train,
rel_valid, n_folds, which_fold)
return None
## ========================================================================
def __cv_split_data_using_id_tbl_compute(rel_origin, col_string, rel_random_id,
random_id, origin_id, row_num, rel_train,
rel_valid, n_folds, which_fold):
(start_row, end_row) = __cv_validation_rows(row_num, n_folds, which_fold)
# Extract the training part of data,
# which corresponds to rows outside of [start_row, end_row).
# Extract the validation part of data,
# which corresponds to rows inside of [start_row, end_row).
sql = """
drop table if exists {rel_train};
create temp table {rel_train} as
select {rel_random_id}.{random_id}, {col_string}
from {rel_origin}, {rel_random_id}
where
{rel_origin}.{origin_id} = {rel_random_id}.{origin_id}
and (
{rel_random_id}.{random_id} < {start_row}
or
{rel_random_id}.{random_id} >= {end_row});
drop table if exists {rel_valid};
create temp table {rel_valid} as
select {rel_random_id}.{random_id}, {col_string}
from {rel_origin}, {rel_random_id}
where
{rel_origin}.{origin_id} = {rel_random_id}.{origin_id}
and (
{rel_random_id}.{random_id} >= {start_row}
and
{rel_random_id}.{random_id} < {end_row})
""".format(**locals())
plpy.execute(sql)
return None