blob: ce2a8e4f237196816bcad65604591892aa763ca6 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import plpy
import re
import pandas as pd
import xgboost as xgb
import numpy
import cPickle as pickle
import zlib
import ast
import collections
import itertools
from bisect import bisect_left
from operator import itemgetter
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.utilities import unique_string
from utilities.validate_args import get_cols
from utilities.validate_args import get_expr_type
from utilities.validate_args import input_tbl_valid
from utilities.validate_args import output_tbl_valid
from utilities.validate_args import cols_in_tbl_valid
def serialize_pandas_dframe_as_bytea(schema_madlib, source_table, id_column,
class_label, features):
"""
Load the data from database and compress it to be stored in a single cell.
"""
mdl_train_sql = """
SELECT
{id_column},
{features},
{class_label}
FROM
{source_table}
""".format(**locals())
result = plpy.execute(mdl_train_sql)
df = pd.DataFrame.from_records(result)
df_filtered = df.dropna(axis=1, how='all')
compressed = zlib.compress(pickle.dumps(df_filtered))
return compressed
def print_prec_rec_fscore_support(mat, metric_labels, class_label, class_values):
"""
pretty print precision, recall, fscore & support using pandas dataframe
"""
tbl = pd.DataFrame(mat, columns=metric_labels)
tbl[class_label] = class_values
tbl = tbl[[class_label]+metric_labels]
return tbl
def takeClosest(myList, myNumber):
"""
Assumes myList is sorted. Returns closest value to myNumber.
If two numbers are equally close, return the smallest number.
"""
pos = bisect_left(myList, myNumber)
if pos == 0:
return myList[0]
if pos == len(myList):
return myList[-1]
before = myList[pos - 1]
after = myList[pos]
if after - myNumber < myNumber - before:
return after
else:
return before
def expand_grid(params):
#Expand the params to run-grid search
params_list = []
for key, val in params.items():
#If supplied param is a list of values, expand it out
if(val and isinstance(val, collections.Iterable)):
r = ["""{k}={v}""".format(k=key,v=v) for v in val]
else:
r = ["""{k}={v}""".format(k=key,v=val)]
params_list.append(r)
params_grid = [l for l in itertools.product(*params_list)]
return params_grid
def try_literal_eval(t):
try:
ret = ast.literal_eval(t)
except Exception:
ret = t
return ret
def xgboost_train(schema_madlib, dframe, features_all, class_label, params,
class_weights, train_set_size, id_column, train_set_split_var):
"""
Run a single xgboost workload.
- Load the data
- Split train and test data for scoring
- Train xgboost
- Calculate metrics to report
"""
df = pickle.loads(zlib.decompress(dframe))
features_all.append(id_column)
features = filter(lambda x: x in df.columns, features_all)
X = df[features].as_matrix()
y = df[class_label]
class_list = numpy.unique(y).tolist()
if not train_set_split_var or train_set_split_var == 'None':
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=float(1-train_set_size))
#We don't actually want the test set size to change. We want it to be constant as we change train set size so we can compare apples to apples
#so lets lock it at 20% (only less if the train size is > 80%)
test_set_size = min((1-train_set_size),0.2)
X_test = X_test[range(0,int(len(y)*test_set_size)),]
y_test = y_test.head(int(len(y)*test_set_size))
else:
split_indx = numpy.WHERE(features == train_set_split_var)[0]
X = numpy.delete(X,split_indx,1)
X_train = X[numpy.array(df[train_set_split_var]==1),]
X_test = X[numpy.array(df[train_set_split_var]==0),]
y_train = y[numpy.array(df[train_set_split_var]==1)]
y_test = y[numpy.array(df[train_set_split_var]==0)]
#save off and remove the id_column for later output. Make sure to get rid of id_column from features!
test_ids = X_test [:,len(features)-1]
X_train = numpy.delete(X_train,len(features)-1,1)
X_test = numpy.delete(X_test,len(features)-1,1)
features = features[0:len(features)-1]
class_list_y_train = numpy.unique(y_train).tolist()
class_list_y_test = numpy.unique(y_test).tolist()
if (class_list != class_list_y_train) or (class_list != class_list_y_test):
plpy.error("Train test split caused a subset with missing classes.")
#run weights
sample_representation = y_train.value_counts()
total_samples = sum(sample_representation)
sample_weights = None
if not class_weights:
sample_weights = map(
lambda s: total_samples*1.0/sample_representation[s]
/
sum([total_samples*1.0/sample_representation[c] for c in sample_representation.keys()])
,
y_train
)
else:
#User-supplied class-weights
class_weights_dict = ast.literal_eval(re.sub("[\\t]","",class_weights).strip())
sample_weights = map(lambda s: class_weights_dict[s], y_train)
#Train gradient boosted trees
p_list = [p.split('=') for p in ast.literal_eval(re.sub("[\\t]","",params).strip())]
params_dict = dict([(k, try_literal_eval(v.strip())) for k,v in p_list])
eval_metric = params_dict.pop('eval_metric') if 'eval_metric' in params_dict else 'auc'
gbm = xgb.XGBClassifier(**params_dict)
#Fit model
gbm.fit(
X_train,
y_train,
eval_metric = eval_metric,
sample_weight = sample_weights
)
#Compute and return model metrics score
y_pred_train = gbm.predict(X_train)
y_pred_test = gbm.predict(X_test)
cmat_train = confusion_matrix(y_train, y_pred_train)
cmat_test = confusion_matrix(y_test, y_pred_test)
scores = numpy.array(precision_recall_fscore_support(y_test, y_pred_test)).transpose()
metric_labels = ['precision', 'recall', 'fscore', 'support']
model_metrics = print_prec_rec_fscore_support(scores, metric_labels, class_label, gbm.classes_)
#Calculate feature importance scores
importance = gbm._Booster.get_fscore()
if len(importance) == 0:
plpy.error("No importance found for any feature")
fnames_importances = sorted(
[(features[int(k.replace('f',''))], importance[k]) for k in importance],
key=itemgetter(1),
reverse=True
)
fnames, f_importance_scores = zip(*fnames_importances)
important_features = pd.DataFrame(fnames_importances)
return (features, pickle.dumps(gbm), params, fnames, f_importance_scores,
model_metrics.iloc[:,1].values.tolist(), model_metrics.iloc[:,2].values.tolist(),
model_metrics.iloc[:,3].values.tolist(),model_metrics.iloc[:,4].values.tolist(),
test_ids)
def xgboost_grid_search(schema_madlib, source_table, id_column, class_label,
list_of_features, list_of_features_to_exclude,
params_str, grid_search_results_tbl, class_weights,
train_set_size, train_set_split_var):
"""
Run multiple xgboost workloads in parallel via grid search.
"""
input_tbl_valid(source_table, 'XGBoost')
cols_in_tbl_valid(source_table, [id_column, class_label], 'XGBoost')
if train_set_split_var is not None:
cols_in_tbl_valid(source_table, train_set_split_var, 'XGBoost')
output_tbl_valid(grid_search_results_tbl, 'XGBoost')
grid_search_results_tbl_summary = add_postfix(grid_search_results_tbl, '_summary')
output_tbl_valid(grid_search_results_tbl_summary, 'XGBoost')
if list_of_features.strip() == '*':
#Extract feature names from information_schema
if list_of_features_to_exclude is None:
list_of_features_to_exclude = []
discard_features = list_of_features_to_exclude + [class_label, id_column]
features = [col for col in get_cols(source_table) if col not in discard_features]
list_of_features = ','.join(features)
else:
features = [f.strip() for f in list_of_features.split(',')]
cols_in_tbl_valid(source_table, features, 'XGBoost')
class_weights = '' if class_weights is None else class_weights
if not params_str:
params_str = """
{
'learning_rate': [0.3], #Regularization on weights (eta). For smaller values, increase n_estimators
'max_depth': [6],#Larger values could lead to overfitting
'n_estimators':[100], #More estimators, lesser variance (better fit on test set)
'eval_metric':['auc']
}
"""
params = ast.literal_eval(re.sub("[\\t]","",params_str).strip())
params_grid = expand_grid(params)
#Save each parameter list in the grid as a row in a distributed table
grid_search_params_temp_tbl = unique_string('grid_params')
grid_search_params_temp_tbl_df = unique_string('grid_params_df')
dist_str = "m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (params_index)')"
sql = """
CREATE TEMP TABLE {grid_search_params_temp_tbl}
(
params_index int,
params text
) {dist_str}
""".format(**locals())
plpy.execute(sql)
sql = """
INSERT INTO {grid_search_params_temp_tbl}
VALUES ({params_index}, $X${val}$X$);
"""
for indx, val in enumerate(params_grid):
plpy.execute(
sql.format(
val=val,
params_index = indx+1, #postgres indices start from 1, so keeping it consistent
grid_search_params_temp_tbl=grid_search_params_temp_tbl
)
)
grid_size = len(params_grid)
sql = """
CREATE TEMP TABLE {grid_search_params_temp_tbl_df}
AS
(
SELECT
df,
generate_series(1, {grid_size}) AS params_index
FROM
(
SELECT
{schema_madlib}.__serialize_pandas_dframe_as_bytea__(
'{source_table}',
'{id_column}',
'{class_label}',
'{list_of_features}'
) AS df
)q
) {dist_str}
""".format(**locals())
plpy.execute(sql)
#Invoke XGBoost's train by passing each row from parameter list table. This will run in parallel.
grid_search_results_temp_tbl = unique_string('results_tbl')
features_str = features = str(features).replace('[','').replace(']','').replace(',',',\n')
sql = """
CREATE TEMP TABLE {grid_search_results_temp_tbl}
AS
(
SELECT
{schema_madlib}.__xgboost_train_parallel__(
df,
ARRAY[
{features_str}
],
'{class_label}',
params,
$CW${class_weights}$CW$,
{train_set_size},
'{id_column}',
'{train_set_split_var}'
) AS mdl_results,
t1.params_index
FROM
{grid_search_params_temp_tbl} t1,
{grid_search_params_temp_tbl_df} t2
WHERE
t1.params_index = t2.params_index
) {dist_str}
""".format(**locals())
plpy.execute(sql)
sql = """
CREATE TABLE {grid_search_results_tbl_summary}
AS
(
SELECT
now() AS mdl_train_ts,
'{source_table}'||'_xgboost' AS mdl_name,
(mdl_results).features,
(mdl_results).params,
(mdl_results).fnames,
(mdl_results).importance,
(mdl_results).precision,
(mdl_results).recall,
(mdl_results).fscore,
(mdl_results).support,
(mdl_results).test_ids,
params_index
FROM
{grid_search_results_temp_tbl}
) {dist_str}
""".format(**locals())
plpy.execute(sql)
sql = """
CREATE TABLE {grid_search_results_tbl}
AS
(
SELECT
(mdl_results).mdl AS model,
(mdl_results).features,
params_index
FROM
{grid_search_results_temp_tbl}
) {dist_str}
""".format(**locals())
plpy.execute(sql)
plpy.execute("""
DROP TABLE {grid_search_params_temp_tbl};
DROP TABLE {grid_search_params_temp_tbl_df};
DROP TABLE {grid_search_results_temp_tbl};
""".format(**locals()))
def xgboost_predict(schema_madlib, scoring_tbl, mdl_table, mdl_output_tbl,
id_column, class_label, params_index):
"""
Predict using an xgboost model. Also generate metrics and roc curve tables.
"""
input_tbl_valid(scoring_tbl, 'XGBoost')
cols_in_tbl_valid(scoring_tbl, [id_column], 'XGBoost')
if class_label:
cols_in_tbl_valid(scoring_tbl, [class_label], 'XGBoost')
input_tbl_valid(mdl_table, 'XGBoost')
output_tbl_valid(mdl_output_tbl, 'XGBoost')
mdl_output_tbl_metrics = add_postfix(mdl_output_tbl, '_metrics')
mdl_output_tbl_roc_curve = add_postfix(mdl_output_tbl, '_roc_curve')
output_tbl_valid(mdl_output_tbl_metrics, 'XGBoost')
output_tbl_valid(mdl_output_tbl_roc_curve, 'XGBoost')
id_type = get_expr_type(id_column, scoring_tbl)
#Load the serialized XGBoost model from the table
mdl_sql = """
SELECT
model,
features
FROM
{mdl_table}
WHERE params_index = {params_index}
""".format(**locals())
result = plpy.execute(mdl_sql)
model = result[0]['model']
features = result[0]['features']
#Train gradient boosted trees
gbm = pickle.loads(model)
#Fetch features from test dataset for scoring
if isinstance(features, list):
features_str = ','.join(features)
else:
features_str = features
features = [features]
comma_class_label = ', {0}'.format(class_label) if class_label else ''
mdl_score_sql = """
SELECT
{id_column},
{features_str}
{comma_class_label}
FROM
{scoring_tbl}
""".format(**locals())
result = plpy.execute(mdl_score_sql)
df = pd.DataFrame.from_records(result)
X_test = df[features]
y_test = df[class_label] if class_label else None
#Score the test set
y_pred_test = gbm.predict(X_test.as_matrix())
y_pred_proba_test = gbm.predict_proba(X_test.as_matrix())
if(class_label):
cmat_test = confusion_matrix(y_test, y_pred_test)
scores = numpy.array(precision_recall_fscore_support(y_test, y_pred_test)).transpose()
metric_labels = ['precision', 'recall', 'fscore', 'support']
model_metrics = print_prec_rec_fscore_support(scores, metric_labels, class_label, gbm.classes_)
else:
model_metrics = 'NA'
predicted_class_label = class_label+'_predicted' if class_label else 'class_label_predicted'
predicted_class_proba_label = class_label+'_proba_predicted' if class_label else 'class_label_proba_predicted'
pred = pd.Series(y_pred_test, index = X_test.index).to_frame(predicted_class_label)
num_unique_classes = pd.DataFrame(data=pred[predicted_class_label]).apply(lambda x: len(x.unique()))
if (range(num_unique_classes) == [0]):
plpy.error('XGBoost: Every prediction is of the same class.')
pred_proba = pd.DataFrame(data=y_pred_proba_test,
index = X_test.index,
columns = range(num_unique_classes) #if not class_label else class_label
)
res_df = pd.concat([df[id_column],pred,pred_proba],axis=1).set_index(X_test.index)
#create a combined column list for all the proba values
res_df['all_class_probas'] = '{' + res_df[0].map(str)
for class_col in range(1,num_unique_classes):
res_df['all_class_probas'] = res_df['all_class_probas'] + ',' + res_df[class_col].map(str)
res_df['all_class_probas'] = res_df['all_class_probas'] + '}'
#Feature importance scores
importance = gbm._Booster.get_fscore()
if len(importance) > 1:
fnames_importances = sorted(
[(features[int(k.replace('f',''))], importance[k]) for k in importance],
key=itemgetter(1),
reverse=True
)
fnames, f_importances = zip(*fnames_importances)
fnames = str(fnames).replace('(','{').replace(')','}').replace('\'','\"')
f_importances = str(f_importances).replace('(','{').replace(')','}').replace('\'','\"')
else:
fnames = "{{{0}}}".format(features)
f_importances = "{{{0}}}".format(importance['f0'])
ret_dict = res_df.to_dict('records')
ret_result = (
(
r[id_column],
r[predicted_class_label],
r['all_class_probas']
)
for r in ret_dict
)
#Create a ROC Curve if testing on a set with class label
if (class_label):
class_list = numpy.unique(y_test)
roc_auc_scores, fpr, tpr, thresholds = [],[],[],[]
for index, classname in enumerate(class_list):
roc_auc_scores.append(roc_auc_score(numpy.array(y_test)==classname,y_pred_proba_test[:,index]))
t_fpr, t_tpr, t_thresholds = roc_curve(numpy.array(y_test),y_pred_proba_test[:,index],pos_label=classname)
fpr.append(t_fpr)
tpr.append(t_tpr)
thresholds.append(t_thresholds)
fpr_df = pd.DataFrame(fpr).transpose()
tpr_df = pd.DataFrame(tpr).transpose()
thresholds_df = pd.DataFrame(thresholds).transpose()
else:
roc_auc_scores = [0]
#Create a table to hold the unit-level results
sql = """
CREATE TABLE {mdl_output_tbl}
(
{id_column} {id_type},
{predicted_class_label} TEXT,
{predicted_class_proba_label} FLOAT8[]
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY ({id_column})');
""".format(**locals())
plpy.execute(sql)
sql = """
INSERT INTO {mdl_output_tbl} VALUES
""".format(**locals())
for row in ret_result:
sql = sql + """
{0},""".format(row)
sql = sql[:-1]
plpy.execute(sql)
#Create a table for holding the metrics and feature importances
sql = """
CREATE TABLE {mdl_output_tbl_metrics}
(
precision DOUBLE PRECISION[],
recall DOUBLE PRECISION[],
fscore DOUBLE PRECISION[],
support DOUBLE PRECISION[],
roc_auc_scores DOUBLE PRECISION[],
feature_names TEXT[],
feature_importance_scores DOUBLE PRECISION[]
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED RANDOMLY');
""".format(**locals())
plpy.execute(sql)
#generate metrics for output
if(class_label):
precision = str(model_metrics.iloc[:,1].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"')
recall = str(model_metrics.iloc[:,2].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"')
fscore = str(model_metrics.iloc[:,3].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"')
support = str(model_metrics.iloc[:,4].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"')
roc_auc_scores = str([round(elem,5) for elem in roc_auc_scores]).replace('[','{').replace(']','}').replace('\'','\"')
else:
precision = '{NULL}'
recall = '{NULL}'
fscore = '{NULL}'
support = '{NULL}'
roc_auc_scores = '{NULL}'
sql = """
INSERT INTO {mdl_output_tbl_metrics}
VALUES (
$X${precision}$X$,
$X${recall}$X$,
$X${fscore}$X$,
$X${support}$X$,
$X${roc_auc_scores}$X$,
$X${fnames}$X$,
$X${f_importances}$X$
);
""".format(**locals())
plpy.execute(sql)
#If a class label was used, create a third output table for roc curves
if (class_label):
#calculate 10% of the data points to save, evenly spaced. We don't need to wait for 100k+ rows to be written to make a good looking curve
output_length = 1000#round(len(thresholds_df)*0.1,0)
numbers = list(range(output_length))
numbers = [100.0*p/output_length for p in numbers]
thresh_list = sorted(thresholds_df.iloc[:,0].values.tolist())
thresh_nums = []
for x in numbers:
thresh_nums.append(takeClosest(thresh_list,numpy.percentile(thresh_list,x)))
thresh_index = []
for x in thresh_nums:
thresh_index.append(thresh_list.index(x))
sql = """
CREATE TABLE {mdl_output_tbl_roc_curve}
(
fpr text[],
tpr text[],
thresholds text[]
)
m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED RANDOMLY');
""".format(**locals())
plpy.execute(sql)
sql = """
INSERT INTO {mdl_output_tbl_roc_curve} VALUES
""".format(**locals())
for x in thresh_index:
sql = sql + """
($X${fpr}$X$,$X${tpr}$X$,$X${thresholds}$X$),""".format(
fpr = str(['%.5f' % round(elem,5) for elem in fpr_df.iloc[x].values.tolist()]).replace('[','{').replace(']','}').replace('\'','\"'),
tpr = str(['%.5f' % round(elem,5) for elem in tpr_df.iloc[x].values.tolist()]).replace('[','{').replace(']','}').replace('\'','\"'),
thresholds = str(['%.5f' % round(elem,5) for elem in thresholds_df.iloc[x].values.tolist()]).replace('[','{').replace(']','}').replace('\'','\"'))
sql = sql[:-1]
plpy.execute(sql)