blob: 8152a03ed41da4dd41ce934e8c0652b267f0271b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline
from statsmodels.multivariate.manova import MANOVA
from rest_framework.exceptions import APIException
log = logging.getLogger(__name__)
def linear_regression(input_data, data):
y = data['risk']
x = data.drop(columns=['risk'])
reg = LinearRegression().fit(x, y)
predictions = reg.predict(input_data)
probability = predictions[0]
color = 'green' if probability > 0.5 else 'red'
return {"color": color, "probability": probability}
def polynomial_regression(input_data, data):
y = data['risk']
x = data.drop(columns=['risk'])
model = Pipeline([('poly', PolynomialFeatures(degree=2)),
('linear', LinearRegression(fit_intercept=False))])
reg =, y)
predictions = reg.predict(input_data)
probability = predictions[0]
color = 'green' if probability > 0.5 else 'red'
return {"color": color, "probability": probability}
def manova(test_row, data, categorical):
data = data.dropna()
data.loc[len(data)] = test_row
le = LabelEncoder()
for val in categorical:
data[val] = le.fit_transform(data[val])
for col in data.columns:
if (col not in categorical):
data[col] = (data[col] - np.mean(data[col])) / np.std(data[col])
test_row = data.iloc[len(data) - 1]
data.drop([len(data) - 1])
data_good = data[data[10] == 0]
data_bad = data[data[10] == 1]
x_good = data_good.drop([10, 9], axis=1)
y_good = data_good[[9]]
x_bad = data_bad.drop([10, 9], axis=1)
y_bad = data_bad[[9]]
man_good = MANOVA(endog=x_good, exog=y_good)
man_bad = MANOVA(endog=x_bad, exog=y_bad)
output_good = man_good.mv_test()
output_bad = man_bad.mv_test()
out_good = np.array(output_good['x0']['stat'])
out_bad = np.array(output_bad['x0']['stat'])
# Wilki's Lambda
WL_good = out_good[0][0]
# Pillai's Trace
PT_good = out_good[1][0]
# Hotelling-Lawley Trace
HT_good = out_good[2][0]
# Roy's Greatest Roots
RGR_good = out_good[3][0]
WL_bad = out_bad[0][0]
PT_bad = out_bad[1][0]
HT_bad = out_bad[2][0]
RGR_bad = out_bad[3][0]
x = test_row.drop([10, 9])
y = test_row[[9]]
data_test_x = x_good.append(x)
data_test_y = y_good.append(y)
man_test = MANOVA(endog=data_test_x, exog=data_test_y)
output_test = man_test.mv_test()
out_test = np.array(output_test['x0']['stat'])
# Wilki's Lambda
WL_test_good = out_test[0][0]
# Pillai's Trace
PT_test_good = out_test[1][0]
# Hotelling-Lawley Trace
HT_test_good = out_test[2][0]
# Roy's Greatest Roots
RGR_test_good = out_test[3][0]
data_test_x = x_bad.append(x)
data_test_y = y_bad.append(y)
man_test = MANOVA(endog=data_test_x, exog=data_test_y)
output_test = man_test.mv_test()
out_test = np.array(output_test['x0']['stat'])
WL_test_bad = out_test[0][0]
PT_test_bad = out_test[1][0]
HT_test_bad = out_test[2][0]
RGR_test_bad = out_test[3][0]
scorecard = {
"method": "MANOVA",
"WL_good": WL_good,
"WL_test_good": WL_test_good,
"WL_bad": WL_bad,
"WL_test_bad": WL_test_bad
ret = "WL good : " + str(WL_good) + " WL test good : " + str(WL_test_good) + \
"\nWL bad : " + \
str(WL_bad) + " WL test bad : " + \
return scorecard
def rename_df_columns(df):
dat_dict = df.to_dict()
new_dat_dict = {}
for key, value in dat_dict.items():
newKey = key
if type(key) == str:
newKey = newKey.lower().replace(' ', '_')
new_dat_dict[newKey] = dat_dict[key]
del dat_dict
df = pd.DataFrame.from_dict(new_dat_dict)
del new_dat_dict
return df
def prepare_data(data):
data['job'] = data['job'].astype('int')
cols = data.columns
num_cols = data._get_numeric_data().columns
categorical = list(set(cols) - set(num_cols))
le = LabelEncoder()
for val in categorical:
data[val] = le.fit_transform(data[val])
for col in data.columns:
if col not in categorical:
data[col] = (data[col] - np.mean(data[col])) / np.std(data[col])
input_data = data.iloc[len(data) - 1]
input_data = input_data.to_dict()
input_data = pd.DataFrame(input_data, index=[0]).drop(columns=['risk'])
return data, input_data
def stat_score(input_data, model_type):
df = pd.read_csv(f'zoo/data/german.csv', index_col=0)
dataset = df.drop(columns=['Saving accounts', 'Checking account'])
dataset = dataset.dropna()
# rename columns(Make them lowercase and snakecase)
dataset = rename_df_columns(dataset)
# Assume input risk is bad
input_data['risk'] = 'bad'
dataset.loc[len(dataset)] = input_data
# Prepare and normalize data
dataset, input_data = prepare_data(dataset)
if model_type == 'manova':
raise APIException(
"Statistical Method Manova is not implemented yet")
# output = manova(input_data, dataset)
elif model_type == 'linearRegression':
output = linear_regression(input_data, dataset)
elif model_type == 'polynomialRegression':
output = polynomial_regression(input_data, dataset)
output['method'] = model_type
return output
except Exception as e:
log.debug(f"An Exception Occurred; {str(e)}")