| #!/usr/bin/python3 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import getopt |
| import json |
| import sys |
| from pathlib import Path |
| |
| import numpy as np |
| import psycopg2 as pg |
| import sqlite3 as sq |
| import xgboost as xgb |
| from sklearn import preprocessing |
| |
| |
| # import matplotlib.pyplot as plt |
| |
| # ######################################################## |
| # METHODS |
| # ######################################################## |
| def format_row(duration, inputsize, jvmmemsize, totalmemsize, vertex_properties, edge_properties): |
| duration_in_sec = int(duration) // 1000 |
| inputsize_in_10kb = int(inputsize) // 10240 # capable of expressing upto around 20TB with int range |
| jvmmemsize_in_mb = int(jvmmemsize) // 1048576 |
| totalmemsize_in_mb = int(totalmemsize) // 1048576 |
| return f'{duration_in_sec} 0:{inputsize_in_10kb} 1:{jvmmemsize_in_mb} 2:{totalmemsize_in_mb} {vertex_properties} {edge_properties}' |
| |
| |
| |
| # ######################################################## |
| def load_data_from_db(tablename): |
| conn = None |
| |
| try: |
| host = "nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com" |
| dbname = "nemo_optimization" |
| dbuser = "postgres" |
| dbpwd = "fake_password" |
| conn = pg.connect(host=host, dbname=dbname, user=dbuser, password=dbpwd) |
| print("Connected to the PostgreSQL DB.") |
| except: |
| try: |
| sqlite_file = "./optimization_db.sqlite" |
| conn = sq.connect(sqlite_file) |
| print("Connected to the SQLite DB.") |
| except: |
| print("I am unable to connect to the database. Try running the script with `./bin/xgboost_optimization.sh`") |
| |
| sql = "SELECT * from " + tablename |
| cur = conn.cursor() |
| try: |
| cur.execute(sql) |
| print("Loaded data from the DB.") |
| except: |
| print("I can't run " + sql) |
| |
| rows = cur.fetchall() |
| processed_rows = [format_row(row[1], row[2], row[3], row[4], row[5], row[6]) for row in rows] |
| cur.close() |
| conn.close() |
| return processed_rows |
| |
| |
| # ######################################################## |
| def write_to_file(filename, rows): |
| f = open(filename, 'w') |
| for row in rows: |
| f.write(row + "\n") |
| f.close() |
| |
| |
| def encode_processed_rows(processed_rows, col_to_id): |
| for i, row in enumerate(processed_rows): |
| arr = row.split() |
| for j, it in enumerate(arr[1:]): |
| k, v = it.split(':') |
| ek = col_to_id[int(k)] |
| arr[j + 1] = f'{ek}:{v}' |
| processed_rows[i] = ' '.join(arr) |
| return processed_rows |
| |
| |
| def decode_rows(rows, id_to_col): |
| for i, row in enumerate(rows): |
| arr = row.split() |
| for j, it in enumerate(arr[1:]): |
| ek, v = it.split(':') |
| k = id_to_col[int(ek)] |
| arr[j + 1] = f'{k}:{v}' |
| rows[i] = ' '.join(arr) |
| return rows |
| |
| |
| # ######################################################## |
| def stringify_num(num): |
| return str(round(num, 2)) |
| |
| |
| def dict_union(d1, d2): |
| for k, v in d2.items(): |
| if k in d1: |
| if type(d1[k]) is dict and type(v) is dict: # When same 'feature' |
| d1[k] = dict_union(d1[k], v) |
| else: # When same 'split' |
| d1[k] = d1[k] + v |
| elif type(v) is dict: # When no initial data |
| d1[k] = v |
| else: # k = split, v = diff. include if it does not violate. |
| if v > 0 > max(d1.values()) and k < max(d1.keys()): # If no positive values yet |
| d1[k] = v |
| elif v > max(d1.values()) > 0: # Update if greater value |
| max_key = max(d1, key=lambda key: d1[key]) |
| del d1[max_key] |
| d1[k] = v |
| elif v < 0 < min(d1.values()) and min(d1.keys()) < k: # If no negative values yet |
| d1[k] = v |
| elif v < min(d1.values()) < 0: # Update if smaller value |
| min_key = min(d1, key=lambda key: d1[key]) |
| del d1[min_key] |
| d1[k] = v |
| return d1 |
| |
| |
| # ######################################################## |
| class Tree: |
| root = None |
| idx_to_node = {} |
| |
| def append_to_dict_if_not_exists(self, idx, node): |
| if idx not in self.idx_to_node: |
| self.idx_to_node[idx] = node |
| |
| def addNode(self, index, feature_id, split, yes, no, missing, value): |
| n = None |
| if self.root == None: |
| self.root = Node(None) |
| n = self.root |
| self.append_to_dict_if_not_exists(index, n) |
| else: |
| n = self.idx_to_node[index] |
| |
| self.append_to_dict_if_not_exists(yes, Node(n)) |
| self.append_to_dict_if_not_exists(no, Node(n)) |
| self.append_to_dict_if_not_exists(missing, Node(n)) |
| n.addAttributes(index, feature_id, split, yes, no, missing, value, self.idx_to_node) |
| |
| def importanceDict(self): |
| return self.root.importanceDict() |
| |
| def __str__(self): |
| return json.dumps(json.loads(str(self.root)), indent=4) |
| |
| |
| class Node: |
| parent = None |
| index = None |
| |
| feature = None |
| split = None |
| left = None |
| right = None |
| missing = None |
| |
| value = None |
| |
| def __init__(self, parent): |
| self.parent = parent |
| |
| def addAttributes(self, index, feature_id, split, yes, no, missing, value, idx_to_node): |
| self.index = index |
| if feature_id == 'Leaf': |
| self.value = value |
| else: |
| self.feature = feature_id |
| self.split = split |
| self.left = idx_to_node[yes] |
| self.right = idx_to_node[no] |
| self.missing = idx_to_node[missing] |
| |
| def isLeaf(self): |
| return self.value != None |
| |
| def isRoot(self): |
| return self.parent == None |
| |
| def getIndex(self): |
| return self.index |
| |
| def getLeft(self): |
| return self.left |
| |
| def getRight(self): |
| return self.right |
| |
| def getMissing(self): |
| return self.missing |
| |
| def getApprox(self): |
| if self.isLeaf(): |
| return self.value |
| else: |
| lapprox = self.left.getApprox() |
| rapprox = self.right.getApprox() |
| if rapprox != 0 and abs(lapprox / rapprox) < 0.04: # smaller than 4% then ignore |
| return rapprox |
| elif lapprox != 0 and abs(rapprox / lapprox) < 0.04: |
| return lapprox |
| else: |
| return (lapprox + rapprox) / 2 |
| |
| def getDiff(self): |
| lapprox = self.left.getApprox() |
| rapprox = self.right.getApprox() |
| if (rapprox != 0 and abs(lapprox / rapprox) < 0.04) or (lapprox != 0 and abs(rapprox / lapprox) < 0.04): |
| return 0 # ignore |
| return lapprox - rapprox |
| |
| def importanceDict(self): |
| if self.isLeaf(): |
| return {} |
| else: |
| d = {} |
| d[self.feature] = {self.split: self.getDiff()} |
| return dict_union(d, dict_union(self.left.importanceDict(), self.right.importanceDict())) |
| |
| def __str__(self): |
| if self.isLeaf(): |
| return f'{stringify_num(self.value)}' |
| else: |
| left = str(self.left) if self.left.isLeaf() else json.loads(str(self.left)) |
| right = str(self.right) if self.right.isLeaf() else json.loads(str(self.right)) |
| return json.dumps({self.index: f'{self.feature}' + '{' + stringify_num(self.getApprox()) + ',' + stringify_num( |
| self.getDiff()) + '}', 'L' + self.left.getIndex(): left, 'R' + self.right.getIndex(): right}) |
| |
| |
| # ######################################################## |
| # MAIN FUNCTION |
| # ######################################################## |
| try: |
| opts, args = getopt.getopt(sys.argv[1:], "ht:m:i:", ["tablename=", "memsize=", "inputsize="]) |
| except getopt.GetoptError: |
| print('nemo_xgboost_optimization.py -t <tablename>') |
| sys.exit(2) |
| for opt, arg in opts: |
| if opt == '-h': |
| print('nemo_xgboost_optimization.py -t <tablename>') |
| sys.exit() |
| elif opt in ("-t", "--tablename"): |
| tablename = arg |
| elif opt in ("-m", "--memsize"): |
| memsize = arg |
| elif opt in ("-i", "--inputsize"): |
| inputsize = arg |
| |
| modelname = tablename + "_bst.model" |
| processed_rows = load_data_from_db(tablename) |
| # write_to_file('process_test', processed_rows) |
| |
| ## Make Dictionary |
| col = [] |
| for row in processed_rows: |
| arr = row.split() |
| for it in arr[1:]: |
| k, v = it.split(':') |
| col.append(int(k)) |
| le = preprocessing.LabelEncoder() |
| ids = le.fit_transform(col) |
| col_to_id = dict(zip(col, ids)) |
| id_to_col = dict(zip(ids, col)) |
| |
| ## PREPROCESSING DATA FOR TAINING |
| encoded_rows = encode_processed_rows(processed_rows, col_to_id) |
| write_to_file('nemo_optimization.out', encoded_rows) |
| # write_to_file('decode_test', decode_rows(encoded_rows, id_to_col)) |
| ddata = xgb.DMatrix('nemo_optimization.out') |
| |
| avg_20_duration = np.mean(ddata.get_label()[:20]) |
| print("average job duration: ", avg_20_duration) |
| allowance = avg_20_duration // 25 # 4% |
| |
| row_size = len(processed_rows) |
| print("total_rows: ", row_size) |
| |
| ## TRAIN THE MODEL (REGRESSION) |
| dtrain = ddata.slice([i for i in range(0, row_size) if i % 6 != 5]) # mod is not 5 |
| print("train_rows: ", dtrain.num_row()) |
| dtest = ddata.slice([i for i in range(0, row_size) if i % 6 == 5]) # mod is 5 |
| print("test_rows: ", dtest.num_row()) |
| labels = dtest.get_label() |
| |
| ## Load existing booster, if it exists |
| bst_opt = xgb.Booster(model_file=modelname) if Path(modelname).is_file() else None |
| preds_opt = bst_opt.predict(dtest) if bst_opt is not None else None |
| error_opt = (sum(1 for i in range(len(preds_opt)) if abs(preds_opt[i] - labels[i]) > allowance) / float( |
| len(preds_opt))) if preds_opt is not None else 1 |
| print('opt_error=%f' % error_opt) |
| min_error = error_opt |
| |
| learning_rates = [0.1, 0.3, 0.4, 0.5, 0.6, 0.7, 0.9] |
| for lr in learning_rates: |
| param = {'max_depth': 6, 'eta': lr, 'verbosity': 0, 'objective': 'reg:linear'} |
| |
| watchlist = [(dtest, 'eval'), (dtrain, 'train')] |
| num_round = row_size // 10 |
| bst = xgb.train(param, dtrain, num_round, watchlist, early_stopping_rounds=5) |
| |
| preds = bst.predict(dtest) |
| error = (sum(1 for i in range(len(preds)) if abs(preds[i] - labels[i]) > allowance) / float(len(preds))) if len( |
| preds) > 0 else 1.0 |
| print('error=%f' % error) |
| |
| ## Better booster |
| if error <= error_opt: |
| bst_opt = bst |
| bst.save_model(modelname) |
| min_error = error |
| |
| print('minimum error=%f' % min_error) |
| |
| ## Let's now use bst_opt |
| ## Check out the histogram by uncommenting the lines below |
| # fscore = bst_opt.get_fscore() |
| # sorted_fscore = sorted(fscore.items(), key=lambda kv: kv[1]) |
| # for i in range(len(sorted_fscore)): |
| # print("\nSplit Value Histogram:") |
| # feature = sorted_fscore.pop()[0] |
| # print(feature, "=", id_to_col[int(feature[1:])]) |
| # hg = bst_opt.get_split_value_histogram(feature) |
| # print(hg) |
| |
| df = bst_opt.trees_to_dataframe() |
| # print("Trees to dataframe") |
| # print(df) |
| |
| trees = {} |
| for index, row in df.iterrows(): |
| if row['Tree'] not in trees: # Tree number = index |
| trees[row['Tree']] = Tree() |
| |
| translated_feature = id_to_col[int(row['Feature'][1:])] if row['Feature'].startswith('f') else row['Feature'] |
| # print(translated_feature) |
| trees[row['Tree']].addNode(row['ID'], translated_feature, row['Split'], row['Yes'], row['No'], row['Missing'], |
| row['Gain']) |
| |
| results = {} |
| print("\nGenerated Trees:") |
| for t in trees.values(): |
| results = dict_union(results, t.importanceDict()) |
| # print(t) |
| |
| print("\nImportanceDict") |
| print(json.dumps(results, indent=2)) |
| |
| print("\nSummary") |
| resultsJson = [] |
| for k, v in results.items(): |
| for kk, vv in v.items(): |
| resultsJson.append({'feature': k, 'split': kk, 'val': vv}) |
| how = 'greater' if vv > 0 else 'smaller' |
| restring = f'{k} should be {how} than {kk}' |
| print(restring) |
| |
| with open("results.out", "w") as file: |
| file.write(json.dumps(resultsJson, indent=2)) |
| |
| # Visualize tree |
| # xgb.plot_tree(bst_opt) |
| # plt.show() |