blob: f198b5df9e2447ff68932980f9f3fe47852bddf9 [file] [log] [blame]
#!/usr/bin/python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import getopt
import json
import sys
from pathlib import Path
import numpy as np
import psycopg2 as pg
import sqlite3 as sq
import xgboost as xgb
from sklearn import preprocessing
# import matplotlib.pyplot as plt
# ########################################################
# METHODS
# ########################################################
def format_row(duration, inputsize, jvmmemsize, totalmemsize, vertex_properties, edge_properties):
duration_in_sec = int(duration) // 1000
inputsize_in_10kb = int(inputsize) // 10240 # capable of expressing upto around 20TB with int range
jvmmemsize_in_mb = int(jvmmemsize) // 1048576
totalmemsize_in_mb = int(totalmemsize) // 1048576
return f'{duration_in_sec} 0:{inputsize_in_10kb} 1:{jvmmemsize_in_mb} 2:{totalmemsize_in_mb} {vertex_properties} {edge_properties}'
# ########################################################
def load_data_from_db(tablename):
conn = None
try:
host = "nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com"
dbname = "nemo_optimization"
dbuser = "postgres"
dbpwd = "fake_password"
conn = pg.connect(host=host, dbname=dbname, user=dbuser, password=dbpwd)
print("Connected to the PostgreSQL DB.")
except:
try:
sqlite_file = "./optimization_db.sqlite"
conn = sq.connect(sqlite_file)
print("Connected to the SQLite DB.")
except:
print("I am unable to connect to the database. Try running the script with `./bin/xgboost_optimization.sh`")
sql = "SELECT * from " + tablename
cur = conn.cursor()
try:
cur.execute(sql)
print("Loaded data from the DB.")
except:
print("I can't run " + sql)
rows = cur.fetchall()
processed_rows = [format_row(row[1], row[2], row[3], row[4], row[5], row[6]) for row in rows]
cur.close()
conn.close()
return processed_rows
# ########################################################
def write_to_file(filename, rows):
f = open(filename, 'w')
for row in rows:
f.write(row + "\n")
f.close()
def encode_processed_rows(processed_rows, col_to_id):
for i, row in enumerate(processed_rows):
arr = row.split()
for j, it in enumerate(arr[1:]):
k, v = it.split(':')
ek = col_to_id[int(k)]
arr[j + 1] = f'{ek}:{v}'
processed_rows[i] = ' '.join(arr)
return processed_rows
def decode_rows(rows, id_to_col):
for i, row in enumerate(rows):
arr = row.split()
for j, it in enumerate(arr[1:]):
ek, v = it.split(':')
k = id_to_col[int(ek)]
arr[j + 1] = f'{k}:{v}'
rows[i] = ' '.join(arr)
return rows
# ########################################################
def stringify_num(num):
return str(round(num, 2))
def dict_union(d1, d2):
for k, v in d2.items():
if k in d1:
if type(d1[k]) is dict and type(v) is dict: # When same 'feature'
d1[k] = dict_union(d1[k], v)
else: # When same 'split'
d1[k] = d1[k] + v
elif type(v) is dict: # When no initial data
d1[k] = v
else: # k = split, v = diff. include if it does not violate.
if v > 0 > max(d1.values()) and k < max(d1.keys()): # If no positive values yet
d1[k] = v
elif v > max(d1.values()) > 0: # Update if greater value
max_key = max(d1, key=lambda key: d1[key])
del d1[max_key]
d1[k] = v
elif v < 0 < min(d1.values()) and min(d1.keys()) < k: # If no negative values yet
d1[k] = v
elif v < min(d1.values()) < 0: # Update if smaller value
min_key = min(d1, key=lambda key: d1[key])
del d1[min_key]
d1[k] = v
return d1
# ########################################################
class Tree:
root = None
idx_to_node = {}
def append_to_dict_if_not_exists(self, idx, node):
if idx not in self.idx_to_node:
self.idx_to_node[idx] = node
def addNode(self, index, feature_id, split, yes, no, missing, value):
n = None
if self.root == None:
self.root = Node(None)
n = self.root
self.append_to_dict_if_not_exists(index, n)
else:
n = self.idx_to_node[index]
self.append_to_dict_if_not_exists(yes, Node(n))
self.append_to_dict_if_not_exists(no, Node(n))
self.append_to_dict_if_not_exists(missing, Node(n))
n.addAttributes(index, feature_id, split, yes, no, missing, value, self.idx_to_node)
def importanceDict(self):
return self.root.importanceDict()
def __str__(self):
return json.dumps(json.loads(str(self.root)), indent=4)
class Node:
parent = None
index = None
feature = None
split = None
left = None
right = None
missing = None
value = None
def __init__(self, parent):
self.parent = parent
def addAttributes(self, index, feature_id, split, yes, no, missing, value, idx_to_node):
self.index = index
if feature_id == 'Leaf':
self.value = value
else:
self.feature = feature_id
self.split = split
self.left = idx_to_node[yes]
self.right = idx_to_node[no]
self.missing = idx_to_node[missing]
def isLeaf(self):
return self.value != None
def isRoot(self):
return self.parent == None
def getIndex(self):
return self.index
def getLeft(self):
return self.left
def getRight(self):
return self.right
def getMissing(self):
return self.missing
def getApprox(self):
if self.isLeaf():
return self.value
else:
lapprox = self.left.getApprox()
rapprox = self.right.getApprox()
if rapprox != 0 and abs(lapprox / rapprox) < 0.04: # smaller than 4% then ignore
return rapprox
elif lapprox != 0 and abs(rapprox / lapprox) < 0.04:
return lapprox
else:
return (lapprox + rapprox) / 2
def getDiff(self):
lapprox = self.left.getApprox()
rapprox = self.right.getApprox()
if (rapprox != 0 and abs(lapprox / rapprox) < 0.04) or (lapprox != 0 and abs(rapprox / lapprox) < 0.04):
return 0 # ignore
return lapprox - rapprox
def importanceDict(self):
if self.isLeaf():
return {}
else:
d = {}
d[self.feature] = {self.split: self.getDiff()}
return dict_union(d, dict_union(self.left.importanceDict(), self.right.importanceDict()))
def __str__(self):
if self.isLeaf():
return f'{stringify_num(self.value)}'
else:
left = str(self.left) if self.left.isLeaf() else json.loads(str(self.left))
right = str(self.right) if self.right.isLeaf() else json.loads(str(self.right))
return json.dumps({self.index: f'{self.feature}' + '{' + stringify_num(self.getApprox()) + ',' + stringify_num(
self.getDiff()) + '}', 'L' + self.left.getIndex(): left, 'R' + self.right.getIndex(): right})
# ########################################################
# MAIN FUNCTION
# ########################################################
try:
opts, args = getopt.getopt(sys.argv[1:], "ht:m:i:", ["tablename=", "memsize=", "inputsize="])
except getopt.GetoptError:
print('nemo_xgboost_optimization.py -t <tablename>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('nemo_xgboost_optimization.py -t <tablename>')
sys.exit()
elif opt in ("-t", "--tablename"):
tablename = arg
elif opt in ("-m", "--memsize"):
memsize = arg
elif opt in ("-i", "--inputsize"):
inputsize = arg
modelname = tablename + "_bst.model"
processed_rows = load_data_from_db(tablename)
# write_to_file('process_test', processed_rows)
## Make Dictionary
col = []
for row in processed_rows:
arr = row.split()
for it in arr[1:]:
k, v = it.split(':')
col.append(int(k))
le = preprocessing.LabelEncoder()
ids = le.fit_transform(col)
col_to_id = dict(zip(col, ids))
id_to_col = dict(zip(ids, col))
## PREPROCESSING DATA FOR TAINING
encoded_rows = encode_processed_rows(processed_rows, col_to_id)
write_to_file('nemo_optimization.out', encoded_rows)
# write_to_file('decode_test', decode_rows(encoded_rows, id_to_col))
ddata = xgb.DMatrix('nemo_optimization.out')
avg_20_duration = np.mean(ddata.get_label()[:20])
print("average job duration: ", avg_20_duration)
allowance = avg_20_duration // 25 # 4%
row_size = len(processed_rows)
print("total_rows: ", row_size)
## TRAIN THE MODEL (REGRESSION)
dtrain = ddata.slice([i for i in range(0, row_size) if i % 6 != 5]) # mod is not 5
print("train_rows: ", dtrain.num_row())
dtest = ddata.slice([i for i in range(0, row_size) if i % 6 == 5]) # mod is 5
print("test_rows: ", dtest.num_row())
labels = dtest.get_label()
## Load existing booster, if it exists
bst_opt = xgb.Booster(model_file=modelname) if Path(modelname).is_file() else None
preds_opt = bst_opt.predict(dtest) if bst_opt is not None else None
error_opt = (sum(1 for i in range(len(preds_opt)) if abs(preds_opt[i] - labels[i]) > allowance) / float(
len(preds_opt))) if preds_opt is not None else 1
print('opt_error=%f' % error_opt)
min_error = error_opt
learning_rates = [0.1, 0.3, 0.4, 0.5, 0.6, 0.7, 0.9]
for lr in learning_rates:
param = {'max_depth': 6, 'eta': lr, 'verbosity': 0, 'objective': 'reg:linear'}
watchlist = [(dtest, 'eval'), (dtrain, 'train')]
num_round = row_size // 10
bst = xgb.train(param, dtrain, num_round, watchlist, early_stopping_rounds=5)
preds = bst.predict(dtest)
error = (sum(1 for i in range(len(preds)) if abs(preds[i] - labels[i]) > allowance) / float(len(preds))) if len(
preds) > 0 else 1.0
print('error=%f' % error)
## Better booster
if error <= error_opt:
bst_opt = bst
bst.save_model(modelname)
min_error = error
print('minimum error=%f' % min_error)
## Let's now use bst_opt
## Check out the histogram by uncommenting the lines below
# fscore = bst_opt.get_fscore()
# sorted_fscore = sorted(fscore.items(), key=lambda kv: kv[1])
# for i in range(len(sorted_fscore)):
# print("\nSplit Value Histogram:")
# feature = sorted_fscore.pop()[0]
# print(feature, "=", id_to_col[int(feature[1:])])
# hg = bst_opt.get_split_value_histogram(feature)
# print(hg)
df = bst_opt.trees_to_dataframe()
# print("Trees to dataframe")
# print(df)
trees = {}
for index, row in df.iterrows():
if row['Tree'] not in trees: # Tree number = index
trees[row['Tree']] = Tree()
translated_feature = id_to_col[int(row['Feature'][1:])] if row['Feature'].startswith('f') else row['Feature']
# print(translated_feature)
trees[row['Tree']].addNode(row['ID'], translated_feature, row['Split'], row['Yes'], row['No'], row['Missing'],
row['Gain'])
results = {}
print("\nGenerated Trees:")
for t in trees.values():
results = dict_union(results, t.importanceDict())
# print(t)
print("\nImportanceDict")
print(json.dumps(results, indent=2))
print("\nSummary")
resultsJson = []
for k, v in results.items():
for kk, vv in v.items():
resultsJson.append({'feature': k, 'split': kk, 'val': vv})
how = 'greater' if vv > 0 else 'smaller'
restring = f'{k} should be {how} than {kk}'
print(restring)
with open("results.out", "w") as file:
file.write(json.dumps(resultsJson, indent=2))
# Visualize tree
# xgb.plot_tree(bst_opt)
# plt.show()