| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import plpy |
| import re |
| import pandas as pd |
| import xgboost as xgb |
| import numpy |
| import cPickle as pickle |
| import zlib |
| import ast |
| import collections |
| import itertools |
| from bisect import bisect_left |
| from operator import itemgetter |
| |
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import precision_recall_fscore_support |
| from sklearn.metrics import confusion_matrix |
| from sklearn.metrics import roc_auc_score |
| from sklearn.metrics import roc_curve |
| |
| from utilities.utilities import _assert |
| from utilities.utilities import add_postfix |
| from utilities.utilities import unique_string |
| from utilities.validate_args import get_cols |
| from utilities.validate_args import get_expr_type |
| from utilities.validate_args import input_tbl_valid |
| from utilities.validate_args import output_tbl_valid |
| from utilities.validate_args import cols_in_tbl_valid |
| |
| def serialize_pandas_dframe_as_bytea(schema_madlib, source_table, id_column, |
| class_label, features): |
| """ |
| Load the data from database and compress it to be stored in a single cell. |
| """ |
| |
| mdl_train_sql = """ |
| SELECT |
| {id_column}, |
| {features}, |
| {class_label} |
| FROM |
| {source_table} |
| """.format(**locals()) |
| |
| result = plpy.execute(mdl_train_sql) |
| df = pd.DataFrame.from_records(result) |
| df_filtered = df.dropna(axis=1, how='all') |
| |
| compressed = zlib.compress(pickle.dumps(df_filtered)) |
| return compressed |
| |
| def print_prec_rec_fscore_support(mat, metric_labels, class_label, class_values): |
| """ |
| pretty print precision, recall, fscore & support using pandas dataframe |
| """ |
| tbl = pd.DataFrame(mat, columns=metric_labels) |
| tbl[class_label] = class_values |
| tbl = tbl[[class_label]+metric_labels] |
| return tbl |
| |
| def takeClosest(myList, myNumber): |
| """ |
| Assumes myList is sorted. Returns closest value to myNumber. |
| If two numbers are equally close, return the smallest number. |
| """ |
| pos = bisect_left(myList, myNumber) |
| if pos == 0: |
| return myList[0] |
| if pos == len(myList): |
| return myList[-1] |
| before = myList[pos - 1] |
| after = myList[pos] |
| if after - myNumber < myNumber - before: |
| return after |
| else: |
| return before |
| |
| def expand_grid(params): |
| #Expand the params to run-grid search |
| params_list = [] |
| for key, val in params.items(): |
| #If supplied param is a list of values, expand it out |
| if(val and isinstance(val, collections.Iterable)): |
| r = ["""{k}={v}""".format(k=key,v=v) for v in val] |
| else: |
| r = ["""{k}={v}""".format(k=key,v=val)] |
| params_list.append(r) |
| |
| params_grid = [l for l in itertools.product(*params_list)] |
| return params_grid |
| |
| def try_literal_eval(t): |
| try: |
| ret = ast.literal_eval(t) |
| except Exception: |
| ret = t |
| return ret |
| |
| def xgboost_train(schema_madlib, dframe, features_all, class_label, params, |
| class_weights, train_set_size, id_column, train_set_split_var): |
| """ |
| Run a single xgboost workload. |
| - Load the data |
| - Split train and test data for scoring |
| - Train xgboost |
| - Calculate metrics to report |
| """ |
| |
| df = pickle.loads(zlib.decompress(dframe)) |
| features_all.append(id_column) |
| features = filter(lambda x: x in df.columns, features_all) |
| X = df[features].as_matrix() |
| y = df[class_label] |
| class_list = numpy.unique(y).tolist() |
| |
| if not train_set_split_var or train_set_split_var == 'None': |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=float(1-train_set_size)) |
| #We don't actually want the test set size to change. We want it to be constant as we change train set size so we can compare apples to apples |
| #so lets lock it at 20% (only less if the train size is > 80%) |
| test_set_size = min((1-train_set_size),0.2) |
| X_test = X_test[range(0,int(len(y)*test_set_size)),] |
| y_test = y_test.head(int(len(y)*test_set_size)) |
| else: |
| split_indx = numpy.WHERE(features == train_set_split_var)[0] |
| X = numpy.delete(X,split_indx,1) |
| X_train = X[numpy.array(df[train_set_split_var]==1),] |
| X_test = X[numpy.array(df[train_set_split_var]==0),] |
| y_train = y[numpy.array(df[train_set_split_var]==1)] |
| y_test = y[numpy.array(df[train_set_split_var]==0)] |
| |
| #save off and remove the id_column for later output. Make sure to get rid of id_column from features! |
| test_ids = X_test [:,len(features)-1] |
| X_train = numpy.delete(X_train,len(features)-1,1) |
| X_test = numpy.delete(X_test,len(features)-1,1) |
| features = features[0:len(features)-1] |
| |
| class_list_y_train = numpy.unique(y_train).tolist() |
| class_list_y_test = numpy.unique(y_test).tolist() |
| if (class_list != class_list_y_train) or (class_list != class_list_y_test): |
| plpy.error("Train test split caused a subset with missing classes.") |
| |
| #run weights |
| sample_representation = y_train.value_counts() |
| total_samples = sum(sample_representation) |
| sample_weights = None |
| if not class_weights: |
| sample_weights = map( |
| lambda s: total_samples*1.0/sample_representation[s] |
| / |
| sum([total_samples*1.0/sample_representation[c] for c in sample_representation.keys()]) |
| , |
| y_train |
| ) |
| else: |
| #User-supplied class-weights |
| class_weights_dict = ast.literal_eval(re.sub("[\\t]","",class_weights).strip()) |
| sample_weights = map(lambda s: class_weights_dict[s], y_train) |
| |
| #Train gradient boosted trees |
| p_list = [p.split('=') for p in ast.literal_eval(re.sub("[\\t]","",params).strip())] |
| params_dict = dict([(k, try_literal_eval(v.strip())) for k,v in p_list]) |
| eval_metric = params_dict.pop('eval_metric') if 'eval_metric' in params_dict else 'auc' |
| gbm = xgb.XGBClassifier(**params_dict) |
| #Fit model |
| gbm.fit( |
| X_train, |
| y_train, |
| eval_metric = eval_metric, |
| sample_weight = sample_weights |
| ) |
| #Compute and return model metrics score |
| y_pred_train = gbm.predict(X_train) |
| y_pred_test = gbm.predict(X_test) |
| cmat_train = confusion_matrix(y_train, y_pred_train) |
| cmat_test = confusion_matrix(y_test, y_pred_test) |
| scores = numpy.array(precision_recall_fscore_support(y_test, y_pred_test)).transpose() |
| |
| metric_labels = ['precision', 'recall', 'fscore', 'support'] |
| model_metrics = print_prec_rec_fscore_support(scores, metric_labels, class_label, gbm.classes_) |
| |
| #Calculate feature importance scores |
| importance = gbm._Booster.get_fscore() |
| if len(importance) == 0: |
| plpy.error("No importance found for any feature") |
| fnames_importances = sorted( |
| [(features[int(k.replace('f',''))], importance[k]) for k in importance], |
| key=itemgetter(1), |
| reverse=True |
| ) |
| fnames, f_importance_scores = zip(*fnames_importances) |
| important_features = pd.DataFrame(fnames_importances) |
| |
| return (features, pickle.dumps(gbm), params, fnames, f_importance_scores, |
| model_metrics.iloc[:,1].values.tolist(), model_metrics.iloc[:,2].values.tolist(), |
| model_metrics.iloc[:,3].values.tolist(),model_metrics.iloc[:,4].values.tolist(), |
| test_ids) |
| |
| def xgboost_grid_search(schema_madlib, source_table, id_column, class_label, |
| list_of_features, list_of_features_to_exclude, |
| params_str, grid_search_results_tbl, class_weights, |
| train_set_size, train_set_split_var): |
| """ |
| Run multiple xgboost workloads in parallel via grid search. |
| """ |
| |
| input_tbl_valid(source_table, 'XGBoost') |
| cols_in_tbl_valid(source_table, [id_column, class_label], 'XGBoost') |
| if train_set_split_var is not None: |
| cols_in_tbl_valid(source_table, train_set_split_var, 'XGBoost') |
| |
| output_tbl_valid(grid_search_results_tbl, 'XGBoost') |
| grid_search_results_tbl_summary = add_postfix(grid_search_results_tbl, '_summary') |
| output_tbl_valid(grid_search_results_tbl_summary, 'XGBoost') |
| |
| if list_of_features.strip() == '*': |
| #Extract feature names from information_schema |
| if list_of_features_to_exclude is None: |
| list_of_features_to_exclude = [] |
| discard_features = list_of_features_to_exclude + [class_label, id_column] |
| features = [col for col in get_cols(source_table) if col not in discard_features] |
| list_of_features = ','.join(features) |
| else: |
| features = [f.strip() for f in list_of_features.split(',')] |
| cols_in_tbl_valid(source_table, features, 'XGBoost') |
| |
| class_weights = '' if class_weights is None else class_weights |
| |
| if not params_str: |
| params_str = """ |
| { |
| 'learning_rate': [0.3], #Regularization on weights (eta). For smaller values, increase n_estimators |
| 'max_depth': [6],#Larger values could lead to overfitting |
| 'n_estimators':[100], #More estimators, lesser variance (better fit on test set) |
| 'eval_metric':['auc'] |
| } |
| """ |
| |
| params = ast.literal_eval(re.sub("[\\t]","",params_str).strip()) |
| params_grid = expand_grid(params) |
| #Save each parameter list in the grid as a row in a distributed table |
| grid_search_params_temp_tbl = unique_string('grid_params') |
| grid_search_params_temp_tbl_df = unique_string('grid_params_df') |
| dist_str = "m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (params_index)')" |
| sql = """ |
| CREATE TEMP TABLE {grid_search_params_temp_tbl} |
| ( |
| params_index int, |
| params text |
| ) {dist_str} |
| """.format(**locals()) |
| plpy.execute(sql) |
| sql = """ |
| INSERT INTO {grid_search_params_temp_tbl} |
| VALUES ({params_index}, $X${val}$X$); |
| """ |
| for indx, val in enumerate(params_grid): |
| plpy.execute( |
| sql.format( |
| val=val, |
| params_index = indx+1, #postgres indices start from 1, so keeping it consistent |
| grid_search_params_temp_tbl=grid_search_params_temp_tbl |
| ) |
| ) |
| |
| |
| grid_size = len(params_grid) |
| sql = """ |
| CREATE TEMP TABLE {grid_search_params_temp_tbl_df} |
| AS |
| ( |
| SELECT |
| df, |
| generate_series(1, {grid_size}) AS params_index |
| FROM |
| ( |
| SELECT |
| {schema_madlib}.__serialize_pandas_dframe_as_bytea__( |
| '{source_table}', |
| '{id_column}', |
| '{class_label}', |
| '{list_of_features}' |
| ) AS df |
| )q |
| ) {dist_str} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| #Invoke XGBoost's train by passing each row from parameter list table. This will run in parallel. |
| grid_search_results_temp_tbl = unique_string('results_tbl') |
| features_str = features = str(features).replace('[','').replace(']','').replace(',',',\n') |
| sql = """ |
| CREATE TEMP TABLE {grid_search_results_temp_tbl} |
| AS |
| ( |
| SELECT |
| {schema_madlib}.__xgboost_train_parallel__( |
| df, |
| ARRAY[ |
| {features_str} |
| ], |
| '{class_label}', |
| params, |
| $CW${class_weights}$CW$, |
| {train_set_size}, |
| '{id_column}', |
| '{train_set_split_var}' |
| ) AS mdl_results, |
| t1.params_index |
| FROM |
| {grid_search_params_temp_tbl} t1, |
| {grid_search_params_temp_tbl_df} t2 |
| WHERE |
| t1.params_index = t2.params_index |
| ) {dist_str} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| sql = """ |
| CREATE TABLE {grid_search_results_tbl_summary} |
| AS |
| ( |
| SELECT |
| now() AS mdl_train_ts, |
| '{source_table}'||'_xgboost' AS mdl_name, |
| (mdl_results).features, |
| (mdl_results).params, |
| (mdl_results).fnames, |
| (mdl_results).importance, |
| (mdl_results).precision, |
| (mdl_results).recall, |
| (mdl_results).fscore, |
| (mdl_results).support, |
| (mdl_results).test_ids, |
| params_index |
| FROM |
| {grid_search_results_temp_tbl} |
| |
| ) {dist_str} |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| sql = """ |
| CREATE TABLE {grid_search_results_tbl} |
| AS |
| ( |
| SELECT |
| (mdl_results).mdl AS model, |
| (mdl_results).features, |
| params_index |
| FROM |
| {grid_search_results_temp_tbl} |
| |
| ) {dist_str} |
| """.format(**locals()) |
| plpy.execute(sql) |
| plpy.execute(""" |
| DROP TABLE {grid_search_params_temp_tbl}; |
| DROP TABLE {grid_search_params_temp_tbl_df}; |
| DROP TABLE {grid_search_results_temp_tbl}; |
| """.format(**locals())) |
| |
| def xgboost_predict(schema_madlib, scoring_tbl, mdl_table, mdl_output_tbl, |
| id_column, class_label, params_index): |
| """ |
| Predict using an xgboost model. Also generate metrics and roc curve tables. |
| """ |
| |
| input_tbl_valid(scoring_tbl, 'XGBoost') |
| cols_in_tbl_valid(scoring_tbl, [id_column], 'XGBoost') |
| if class_label: |
| cols_in_tbl_valid(scoring_tbl, [class_label], 'XGBoost') |
| input_tbl_valid(mdl_table, 'XGBoost') |
| output_tbl_valid(mdl_output_tbl, 'XGBoost') |
| mdl_output_tbl_metrics = add_postfix(mdl_output_tbl, '_metrics') |
| mdl_output_tbl_roc_curve = add_postfix(mdl_output_tbl, '_roc_curve') |
| output_tbl_valid(mdl_output_tbl_metrics, 'XGBoost') |
| output_tbl_valid(mdl_output_tbl_roc_curve, 'XGBoost') |
| |
| id_type = get_expr_type(id_column, scoring_tbl) |
| #Load the serialized XGBoost model from the table |
| mdl_sql = """ |
| SELECT |
| model, |
| features |
| FROM |
| {mdl_table} |
| WHERE params_index = {params_index} |
| """.format(**locals()) |
| result = plpy.execute(mdl_sql) |
| model = result[0]['model'] |
| features = result[0]['features'] |
| #Train gradient boosted trees |
| gbm = pickle.loads(model) |
| |
| #Fetch features from test dataset for scoring |
| if isinstance(features, list): |
| features_str = ','.join(features) |
| else: |
| features_str = features |
| features = [features] |
| comma_class_label = ', {0}'.format(class_label) if class_label else '' |
| mdl_score_sql = """ |
| SELECT |
| {id_column}, |
| {features_str} |
| {comma_class_label} |
| FROM |
| {scoring_tbl} |
| """.format(**locals()) |
| result = plpy.execute(mdl_score_sql) |
| |
| df = pd.DataFrame.from_records(result) |
| X_test = df[features] |
| y_test = df[class_label] if class_label else None |
| |
| #Score the test set |
| y_pred_test = gbm.predict(X_test.as_matrix()) |
| y_pred_proba_test = gbm.predict_proba(X_test.as_matrix()) |
| if(class_label): |
| cmat_test = confusion_matrix(y_test, y_pred_test) |
| scores = numpy.array(precision_recall_fscore_support(y_test, y_pred_test)).transpose() |
| metric_labels = ['precision', 'recall', 'fscore', 'support'] |
| model_metrics = print_prec_rec_fscore_support(scores, metric_labels, class_label, gbm.classes_) |
| else: |
| model_metrics = 'NA' |
| |
| predicted_class_label = class_label+'_predicted' if class_label else 'class_label_predicted' |
| predicted_class_proba_label = class_label+'_proba_predicted' if class_label else 'class_label_proba_predicted' |
| pred = pd.Series(y_pred_test, index = X_test.index).to_frame(predicted_class_label) |
| num_unique_classes = pd.DataFrame(data=pred[predicted_class_label]).apply(lambda x: len(x.unique())) |
| if (range(num_unique_classes) == [0]): |
| plpy.error('XGBoost: Every prediction is of the same class.') |
| |
| pred_proba = pd.DataFrame(data=y_pred_proba_test, |
| index = X_test.index, |
| columns = range(num_unique_classes) #if not class_label else class_label |
| ) |
| |
| res_df = pd.concat([df[id_column],pred,pred_proba],axis=1).set_index(X_test.index) |
| |
| #create a combined column list for all the proba values |
| res_df['all_class_probas'] = '{' + res_df[0].map(str) |
| for class_col in range(1,num_unique_classes): |
| res_df['all_class_probas'] = res_df['all_class_probas'] + ',' + res_df[class_col].map(str) |
| res_df['all_class_probas'] = res_df['all_class_probas'] + '}' |
| |
| #Feature importance scores |
| importance = gbm._Booster.get_fscore() |
| |
| if len(importance) > 1: |
| fnames_importances = sorted( |
| [(features[int(k.replace('f',''))], importance[k]) for k in importance], |
| key=itemgetter(1), |
| reverse=True |
| ) |
| fnames, f_importances = zip(*fnames_importances) |
| fnames = str(fnames).replace('(','{').replace(')','}').replace('\'','\"') |
| f_importances = str(f_importances).replace('(','{').replace(')','}').replace('\'','\"') |
| else: |
| fnames = "{{{0}}}".format(features) |
| f_importances = "{{{0}}}".format(importance['f0']) |
| |
| ret_dict = res_df.to_dict('records') |
| ret_result = ( |
| ( |
| r[id_column], |
| r[predicted_class_label], |
| r['all_class_probas'] |
| ) |
| for r in ret_dict |
| ) |
| |
| #Create a ROC Curve if testing on a set with class label |
| if (class_label): |
| class_list = numpy.unique(y_test) |
| roc_auc_scores, fpr, tpr, thresholds = [],[],[],[] |
| for index, classname in enumerate(class_list): |
| |
| roc_auc_scores.append(roc_auc_score(numpy.array(y_test)==classname,y_pred_proba_test[:,index])) |
| t_fpr, t_tpr, t_thresholds = roc_curve(numpy.array(y_test),y_pred_proba_test[:,index],pos_label=classname) |
| fpr.append(t_fpr) |
| tpr.append(t_tpr) |
| thresholds.append(t_thresholds) |
| fpr_df = pd.DataFrame(fpr).transpose() |
| tpr_df = pd.DataFrame(tpr).transpose() |
| thresholds_df = pd.DataFrame(thresholds).transpose() |
| else: |
| roc_auc_scores = [0] |
| |
| #Create a table to hold the unit-level results |
| sql = """ |
| CREATE TABLE {mdl_output_tbl} |
| ( |
| {id_column} {id_type}, |
| {predicted_class_label} TEXT, |
| {predicted_class_proba_label} FLOAT8[] |
| ) |
| m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY ({id_column})'); |
| """.format(**locals()) |
| plpy.execute(sql) |
| sql = """ |
| INSERT INTO {mdl_output_tbl} VALUES |
| |
| """.format(**locals()) |
| for row in ret_result: |
| sql = sql + """ |
| {0},""".format(row) |
| sql = sql[:-1] |
| plpy.execute(sql) |
| |
| #Create a table for holding the metrics and feature importances |
| sql = """ |
| CREATE TABLE {mdl_output_tbl_metrics} |
| ( |
| precision DOUBLE PRECISION[], |
| recall DOUBLE PRECISION[], |
| fscore DOUBLE PRECISION[], |
| support DOUBLE PRECISION[], |
| roc_auc_scores DOUBLE PRECISION[], |
| feature_names TEXT[], |
| feature_importance_scores DOUBLE PRECISION[] |
| ) |
| m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED RANDOMLY'); |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| #generate metrics for output |
| if(class_label): |
| precision = str(model_metrics.iloc[:,1].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"') |
| recall = str(model_metrics.iloc[:,2].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"') |
| fscore = str(model_metrics.iloc[:,3].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"') |
| support = str(model_metrics.iloc[:,4].values.tolist()).replace('[','{').replace(']','}').replace('\'','\"') |
| roc_auc_scores = str([round(elem,5) for elem in roc_auc_scores]).replace('[','{').replace(']','}').replace('\'','\"') |
| else: |
| precision = '{NULL}' |
| recall = '{NULL}' |
| fscore = '{NULL}' |
| support = '{NULL}' |
| roc_auc_scores = '{NULL}' |
| |
| sql = """ |
| INSERT INTO {mdl_output_tbl_metrics} |
| VALUES ( |
| $X${precision}$X$, |
| $X${recall}$X$, |
| $X${fscore}$X$, |
| $X${support}$X$, |
| $X${roc_auc_scores}$X$, |
| $X${fnames}$X$, |
| $X${f_importances}$X$ |
| ); |
| """.format(**locals()) |
| |
| plpy.execute(sql) |
| |
| #If a class label was used, create a third output table for roc curves |
| if (class_label): |
| #calculate 10% of the data points to save, evenly spaced. We don't need to wait for 100k+ rows to be written to make a good looking curve |
| output_length = 1000#round(len(thresholds_df)*0.1,0) |
| numbers = list(range(output_length)) |
| numbers = [100.0*p/output_length for p in numbers] |
| thresh_list = sorted(thresholds_df.iloc[:,0].values.tolist()) |
| thresh_nums = [] |
| for x in numbers: |
| thresh_nums.append(takeClosest(thresh_list,numpy.percentile(thresh_list,x))) |
| |
| thresh_index = [] |
| for x in thresh_nums: |
| thresh_index.append(thresh_list.index(x)) |
| |
| sql = """ |
| CREATE TABLE {mdl_output_tbl_roc_curve} |
| ( |
| fpr text[], |
| tpr text[], |
| thresholds text[] |
| ) |
| m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED RANDOMLY'); |
| """.format(**locals()) |
| plpy.execute(sql) |
| |
| sql = """ |
| INSERT INTO {mdl_output_tbl_roc_curve} VALUES |
| |
| """.format(**locals()) |
| for x in thresh_index: |
| sql = sql + """ |
| ($X${fpr}$X$,$X${tpr}$X$,$X${thresholds}$X$),""".format( |
| fpr = str(['%.5f' % round(elem,5) for elem in fpr_df.iloc[x].values.tolist()]).replace('[','{').replace(']','}').replace('\'','\"'), |
| tpr = str(['%.5f' % round(elem,5) for elem in tpr_df.iloc[x].values.tolist()]).replace('[','{').replace(']','}').replace('\'','\"'), |
| thresholds = str(['%.5f' % round(elem,5) for elem in thresholds_df.iloc[x].values.tolist()]).replace('[','{').replace(']','}').replace('\'','\"')) |
| sql = sql[:-1] |
| plpy.execute(sql) |