blob: 876aa8b4fee7b227bd5e3e94fc292d745f542d45 [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
import unittest
import pandas as pd
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, IndexToString
from pyspark.ml.regression import LinearRegression
import pyspark.sql.functions as sf
from sklearn import linear_model
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import OneHotEncoder
import slicing.base.slicer as slicer
from slicing.base import union_slicer
from slicing.base.node import Node
from slicing.base.top_k import Topk
from slicing.spark_modules import spark_utils, spark_slicer, spark_union_slicer
class SliceTests(unittest.TestCase):
loss_type = 0
# x, y = m.generate_dataset(10, 100)
train_dataset = pd.read_csv("toy_train.csv")
attributes_amount = len(train_dataset.values[0])
model = linear_model.LinearRegression()
y_train = train_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
x_train = train_dataset.iloc[:, 0:attributes_amount - 1].values
model.fit(x_train, y_train)
test_dataset = pd.read_csv("toy.csv")
y_test = test_dataset.iloc[:, attributes_amount - 1:attributes_amount].values
x_test = test_dataset.iloc[:, 0:attributes_amount - 1].values
y_pred = model.predict(x_test)
print("Mean squared error: %.2f"
% mean_squared_error(y_test, y_pred))
print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
# Now that we have trained the model, we can print the coefficient of x that it has predicted
print('Coefficients: \n', model.coef_)
enc = OneHotEncoder(handle_unknown='ignore')
x = enc.fit_transform(x_test).toarray()
complete_x = []
complete_y = []
counter = 0
for item in x:
complete_x.append((counter, item))
complete_y.append((counter, y_test[counter]))
counter = counter + 1
all_features = enc.get_feature_names()
loss = mean_squared_error(y_test, y_pred)
devs = (y_pred - y_test) ** 2
errors = []
counter = 0
for pred in devs:
errors.append((counter, pred))
counter = counter + 1
k = 5
w = 0.5
alpha = 4
top_k = Topk(k)
debug = True
b_update = True
first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
loss_type, top_k, alpha, w)
first_level_nodes = first_level[0]
slice_member = first_level_nodes[(7, 'x2_2')]
def test_attr_spark(self):
conf = SparkConf().setAppName("toy_test").setMaster('local[2]')
num_partitions = 2
enumerator = "join"
model_type = "regression"
label = 'target'
sparkContext = SparkContext(conf=conf)
sqlContext = SQLContext(sparkContext)
train_df = sqlContext.read.csv("toy_train.csv", header='true',
inferSchema='true')
test_df = sqlContext.read.csv("toy.csv", header='true',
inferSchema='true')
# initializing stages of main transformation pipeline
stages = []
# list of categorical features for further hot-encoding
cat_features = ['a', 'b', 'c']
for feature in cat_features:
string_indexer = StringIndexer(inputCol=feature, outputCol=feature + "_index").setHandleInvalid("skip")
encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[feature + "_vec"])
encoder.setDropLast(False)
stages += [string_indexer, encoder]
assembler_inputs = [feature + "_vec" for feature in cat_features]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs")
stages += [assembler]
assembler_final = VectorAssembler(inputCols=["assembled_inputs"], outputCol="features")
stages += [assembler_final]
pipeline = Pipeline(stages=stages)
train_pipeline_model = pipeline.fit(train_df)
test_pipeline_model = pipeline.fit(test_df)
train_df_transformed = train_pipeline_model.transform(train_df)
test_df_transformed = test_pipeline_model.transform(test_df)
train_df_transformed = train_df_transformed.withColumn('model_type', sf.lit(0))
test_df_transformed = test_df_transformed.withColumn('model_type', sf.lit(0))
decode_dict = {}
counter = 0
cat = 0
for feature in cat_features:
colIdx = test_df_transformed.select(feature, feature + "_index").distinct().rdd.collectAsMap()
colIdx = {k: v for k, v in sorted(colIdx.items(), key=lambda item: item[1])}
for item in colIdx:
decode_dict[counter] = (cat, item, colIdx[item], counter)
counter = counter + 1
cat = cat + 1
train_df_transform_fin = train_df_transformed.select('features', label, 'model_type')
test_df_transform_fin = test_df_transformed.select('features', label, 'model_type')
lr = LinearRegression(featuresCol='features', labelCol=label, maxIter=10, regParam=0.0, elasticNetParam=0.8)
lr_model = lr.fit(train_df_transform_fin)
eval = lr_model.evaluate(test_df_transform_fin)
f_l2 = eval.meanSquaredError
pred = eval.predictions
pred_df_fin = pred.withColumn('error', spark_utils.calc_loss(pred[label], pred['prediction'], pred['model_type']))
predictions = pred_df_fin.select('features', 'error').repartition(num_partitions)
converter = IndexToString(inputCol='features', outputCol='cats')
all_features = list(decode_dict)
predictions = predictions.collect()
spark_join = spark_slicer.parallel_process(all_features, predictions, f_l2, sparkContext, debug=self.debug, alpha=self.alpha,
k=self.k, w=self.w, loss_type=self.loss_type, enumerator="join")
spark_union = spark_union_slicer.process(all_features, predictions, f_l2, sparkContext, debug=self.debug, alpha=self.alpha,
k=self.k, w=self.w, loss_type=self.loss_type, enumerator="union")
self.assertEqual(3, len(spark_join.slices))
print("check1")
self.assertEqual(spark_join.min_score, spark_union.min_score)
print("check2")
self.assertEqual(spark_join.keys, spark_union.keys)
print("check3")
self.assertEqual(len(spark_join.slices), len(spark_union.slices))
print("check4")
idx = -1
for sliced in spark_join.slices:
idx += 1
self.assertEqual(sliced.score, spark_union.slices[idx].score)
print("check5")
def test_features_number(self):
self.assertEqual(len(self.all_features), 9)
print("check 1")
def test_base_first_level(self):
self.assertEqual(9, len(self.first_level_nodes))
print("check 2")
def test_parents_first(self):
self.assertIn(('x2_2', 7), self.slice_member.parents)
print("check 3")
def test_name(self):
self.assertEqual('x2_2', self.slice_member.make_name())
print("check 4")
def test_size(self):
self.assertEqual(36, self.slice_member.size)
print("check 5")
def test_e_upper(self):
self.assertEqual(81, self.slice_member.e_upper)
print("check 6")
def test_loss(self):
self.assertEqual(22, int(self.slice_member.loss))
print("check 7")
def test_opt_fun(self):
self.slice_member.score = slicer.opt_fun(self.slice_member.loss, self.slice_member.size, self.loss, len(self.x_test), self.w)
print("check 8")
def test_score(self):
self.assertEqual(1.2673015873015872, self.slice_member.score)
print("check 9")
def test_base_join_enum(self):
cur_lvl_nodes = {}
all_nodes = {}
b_update = True
cur_lvl = 1
slice_index = (2, 'x0_3')
combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
self.assertEqual(6, len(combined[0]))
print("check1")
def test_parents_second(self):
cur_lvl_nodes = {}
all_nodes = {}
b_update = True
cur_lvl = 1
slice_index = (2, 'x0_3')
combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
parent1 = combined[0][('x0_3 && x1_3')]
parent2 = combined[0][('x0_3 && x2_2')]
new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
new_node.parents = [parent1, parent2]
parent1_attr = parent1.attributes
parent2_attr = parent2.attributes
new_node_attr = slicer.union(parent1_attr, parent2_attr)
self.assertEqual(new_node_attr, [('x0_3', 2), ('x1_3', 5), ('x2_2', 7)])
print("check2")
def test_nonsense(self):
cur_lvl_nodes = {}
all_nodes = {}
b_update = True
cur_lvl = 1
slice_index = (2, 'x0_3')
combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
parent1 = combined[0][('x0_3 && x1_3')]
parent2 = combined[0][('x0_3 && x2_2')]
new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
new_node.parents = [parent1, parent2]
parent1_attr = parent1.attributes
parent2_attr = parent2.attributes
new_node_attr = slicer.union(parent1_attr, parent2_attr)
new_node.attributes = new_node_attr
new_node.name = new_node.make_name()
flagTrue = slicer.slice_name_nonsense(parent1, parent2, 2)
self.assertEqual(True, flagTrue)
print("check3")
def test_non_nonsense(self):
cur_lvl_nodes = {}
all_nodes = {}
b_update = True
cur_lvl = 1
slice_index = (2, 'x0_3')
parent3 = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
parent3.parents = [self.first_level_nodes[(4, 'x1_2')], self.first_level_nodes[(7, 'x2_2')]]
parent3.attributes = [('x1_2', 4), ('x2_2', 7)]
combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
parent2 = combined[0]['x0_3 && x2_3']
parent3.key = (8, 'x1_2 && x2_2')
flag_nonsense = slicer.slice_name_nonsense(parent2, parent3, 2)
self.assertEqual(True, flag_nonsense)
print("check4")
def test_uppers(self):
cur_lvl_nodes = {}
all_nodes = {}
b_update = True
cur_lvl = 1
slice_index = (2, 'x0_3')
parent3 = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
parent3.parents = [self.first_level_nodes[(4, 'x1_2')], self.first_level_nodes[(7, 'x2_2')]]
parent3.attributes = [('x1_2', 4), ('x2_2', 7)]
combined = slicer.join_enum(slice_index, self.first_level_nodes, self.complete_x, self.loss,
len(self.complete_x), self.y_test, self.errors, self.debug, self.alpha, self.w,
self.loss_type, b_update, cur_lvl, all_nodes, self.top_k, cur_lvl_nodes)
parent1 = combined[0]['x0_3 && x1_3']
parent2 = combined[0]['x0_3 && x2_3']
new_node = Node(self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors)
new_node.parents = [parent1, parent2]
new_node.calc_bounds(2, self.w)
self.assertEqual(25, new_node.s_upper)
print("check5")
self.assertEqual(398, int(new_node.c_upper))
print("check6")
def test_topk_slicing(self):
join_top_k = slicer.process(self.all_features, self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors,
self.debug, self.alpha, self.k, self.w, self.loss_type, self.b_update)
union_top_k = union_slicer.process(self.all_features, self.complete_x, self.loss, len(self.complete_x), self.y_test, self.errors,
self.debug, self.alpha, self.k, self.w, self.loss_type, self.b_update)
self.assertEqual(join_top_k.min_score, union_top_k.min_score)
print("check1")
self.assertEqual(join_top_k.keys, union_top_k.keys)
print("check2")
self.assertEqual(len(join_top_k.slices), len(union_top_k.slices))
print("check3")
idx = -1
for sliced in join_top_k.slices:
idx += 1
self.assertEqual(sliced.score, union_top_k.slices[idx].score)
print("check4")
def test_extreme_target(self):
test_dataset = pd.read_csv("/home/lana/diploma/project/slicing/datasets/toy_extreme_change.csv")
y_test = test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
x_test = test_dataset.iloc[:, 0:self.attributes_amount - 1].values
y_pred = self.model.predict(x_test)
print("Mean squared error: %.2f"
% mean_squared_error(y_test, y_pred))
print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
# Now that we have trained the model, we can print the coefficient of x that it has predicted
print('Coefficients: \n', self.model.coef_)
enc = OneHotEncoder(handle_unknown='ignore')
x = enc.fit_transform(x_test).toarray()
complete_x = []
complete_y = []
counter = 0
for item in x:
complete_x.append((counter, item))
complete_y.append((counter, y_test[counter]))
counter = counter + 1
all_features = enc.get_feature_names()
loss = mean_squared_error(y_test, y_pred)
devs = (y_pred - y_test) ** 2
errors = []
counter = 0
for pred in devs:
errors.append((counter, pred))
counter = counter + 1
k = 5
w = 0.5
alpha = 4
top_k = Topk(k)
debug = True
b_update = True
first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
self.loss_type, top_k, alpha, w)
first_level_nodes = first_level[0]
slice_member = first_level_nodes[(7, 'x2_2')]
self.assertGreater(slice_member.loss, self.slice_member.loss)
print("check 1")
self.assertGreater(slice_member.score, self.slice_member.score)
print("check 2")
def test_error_significance(self):
y_test = self.test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
x_test = self.test_dataset.iloc[:, 0:self.attributes_amount - 1].values
y_pred = self.model.predict(x_test)
print("Mean squared error: %.2f"
% mean_squared_error(y_test, y_pred))
print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
# Now that we have trained the model, we can print the coefficient of x that it has predicted
print('Coefficients: \n', self.model.coef_)
enc = OneHotEncoder(handle_unknown='ignore')
x = enc.fit_transform(x_test).toarray()
complete_x = []
complete_y = []
counter = 0
for item in x:
complete_x.append((counter, item))
complete_y.append((counter, y_test[counter]))
counter = counter + 1
all_features = enc.get_feature_names()
loss = mean_squared_error(y_test, y_pred)
devs = (y_pred - y_test) ** 2
errors = []
counter = 0
for pred in devs:
errors.append((counter, pred))
counter = counter + 1
k = 5
# Maximized size significance
w = 0
alpha = 4
top_k = Topk(k)
debug = True
b_update = True
first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
self.loss_type, top_k, alpha, w)
first_level_nodes = first_level[0]
slice_member = first_level_nodes[(7, 'x2_2')]
self.assertGreater(self.slice_member.score, slice_member.score)
def test_size_significance(self):
y_test = self.test_dataset.iloc[:, self.attributes_amount - 1:self.attributes_amount].values
x_test = self.test_dataset.iloc[:, 0:self.attributes_amount - 1].values
y_pred = self.model.predict(x_test)
print("Mean squared error: %.2f"
% mean_squared_error(y_test, y_pred))
print('r_2 statistic: %.2f' % r2_score(y_test, y_pred))
# Now that we have trained the model, we can print the coefficient of x that it has predicted
print('Coefficients: \n', self.model.coef_)
enc = OneHotEncoder(handle_unknown='ignore')
x = enc.fit_transform(x_test).toarray()
complete_x = []
complete_y = []
counter = 0
for item in x:
complete_x.append((counter, item))
complete_y.append((counter, y_test[counter]))
counter = counter + 1
all_features = enc.get_feature_names()
loss = mean_squared_error(y_test, y_pred)
devs = (y_pred - y_test) ** 2
errors = []
counter = 0
for pred in devs:
errors.append((counter, pred))
counter = counter + 1
k = 5
# Maximized size significance
w = 1
alpha = 4
top_k = Topk(k)
debug = True
b_update = True
first_level = slicer.make_first_level(all_features, list(complete_x), loss, len(complete_x), y_test, errors,
self.loss_type, top_k, alpha, w)
first_level_nodes = first_level[0]
slice_member = first_level_nodes[(7, 'x2_2')]
self.assertGreater(slice_member.score, self.slice_member.score)
if __name__ == '__main__':
unittest.main()