blob: ff5c5d203c67eaf31f854b282d394f9b86a53afe [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
X = read($1, data_type="frame", format="csv", header=TRUE,
naStrings= ["NA", "null"," ","NaN", "nan", "", "?"]);
meta = read($2, data_type="frame", format="csv", header=FALSE);
primitives = read($3, data_type = "frame", format="csv", header= TRUE)
param = read($4, data_type = "frame", format="csv", header= FALSE)
sample = 0.4
k = 5
R = 40
# accuracy flag
weightedAccuracy = TRUE
# TODO do it automatically
# Generate logical Pipeline
logical = as.frame("OTLR")
logical = cbind(logical, as.frame("MVI"))
# logical = cbind(logical, as.frame("CI"))
# logical = cbind(logical, as.frame("NR"))
[result, res] = startCleaning(X, logical, "classification", meta, primitives, param, k,
sample, weightedAccuracy, R, TRUE)
output = as.logical(as.scalar(res[1,1] < res[1,2]))
write(output, $5, format = "text")
str = "meta info \n"
str = append(str, "pipeline length "+k)
str = append(str,"sample size of data "+sample)
startCleaning = function(Frame[Unknown] F, Frame[Unknown] logical, String targetApplication = "classification",
Frame[Unknown] metaInfo, Frame[Unknown] primitives, Frame[Unknown] param, Integer k, Double sample,
Boolean isWeighted = TRUE, Integer R=50, Boolean verbose = FALSE)
return (Frame[Unknown] result, Matrix[Double] res)
{
res = as.matrix(0)
result = as.frame(0)
if(nrow(metaInfo) < 3)
stop("incomplete meta info")
# initialize variables
eX = as.matrix(0)
eY = as.matrix(0)
getSchema = metaInfo[1, 2:ncol(metaInfo)]
getMask = as.matrix(metaInfo[2, 2:ncol(metaInfo)])
# validate schema
X = dropInvalidType(F, getSchema)
if(sum(getMask) > 0 )
{
idx = getMask * t(seq(1, ncol(X)))
idx = removeEmpty(target = idx, margin = "cols")
index = vectorToCsv(idx)
jspecR = "{ids:true, recode:["+index+"]}"
[eX, X_meta] = transformencode(target=X, spec=jspecR);
# change the schema
getSchema = map(getSchema, "x->x.replace(\"STRING\", \"INT64\")")
getSchema = map(getSchema, "x->x.replace(\"BOOLEAN\", \"INT64\")")
} else {
eX = as.matrix(X)
}
eY = eX[, ncol(X)]
eX = eX[, 1:ncol(X) - 1]
if(min(eY) == 0)
eY = eY + 1 # add one to class labels so that the min label is greater then zero
getMask = getMask[, 1:ncol(getMask) - 1] # strip the mask of class label
getSchema = getSchema[, 1:ncol(getSchema) - 1] # strip the mask of class label
[eX, eY] = doSample(eX, eY, sample)
# get train test and validation set with balanced class distribution
[X_trainset, y_trainset, X_test, y_test] = getDataSplits(eX, eY, 0.6, FALSE)
[X_train, y_train, X_val, y_val] = getDataSplits(X_trainset, y_trainset, 0.7, FALSE)
if(verbose) {
print("classes in each split ")
trc = table(y_train, 1)
print("class distribution in train \n"+toString(trc))
tsc = table(y_test, 1)
print("class distribution in test \n"+toString(tsc))
tvc = table(y_val, 1)
print("class distribution in validation \n"+toString(tvc))
}
# get test accuracy first
# classify without cleaning
train_tmp = replace(target = X_train, pattern = NaN, replacement=0)
test_tmp = replace(target = X_test, pattern = NaN, replacement=0)
betas = multiLogReg(X=train_tmp, Y=y_train, icpt=2, tol=1e-9, reg=0.00001, maxi=50,
maxii=50, verbose=FALSE)
[d_prob, d_yhat, d_accuracy] = multiLogRegPredict(test_tmp, betas, y_test, FALSE)
d_accuracy = getAccuracy(y_test, d_yhat, isWeighted)
if(verbose)
print("dirty accuracy "+d_accuracy)
pip = as.frame("")
hp = as.matrix(0)
acc = as.matrix(0)
[pip, hp, acc] = bandit(X_train, y_train, X_val, y_val, getMask, getSchema, logical, primitives,
param, k, d_accuracy, isWeighted, R, TRUE);
tmp_hp = cbind(matrix(NaN, nrow(hp), 1), hp)
result = cbind(pip, as.frame(tmp_hp))
result = cbind(result, as.frame(acc))
if(as.scalar((is.na(acc[1,1]))) == 1) {
stop("warning: no best pipeline found")
}
res = testBestPipeline(pip[1,], hp[1,], X_train, y_train, X_test, y_test, getMask,
getSchema, as.scalar(acc[1,1]))
}
testBestPipeline = function(Frame[Unknown] pip, Matrix[Double] hp, Matrix[Double] X_train, Matrix[Double] y_train,
Matrix[Double] X_test, Matrix[Double] y_test, Matrix[Double] cmask, Frame[Unknown] schema, Double valAcc)
return (Matrix[Double] result) {
ls = list();
i = 1
k = 1
# construct the parameter list for best hyper-parameters
while(k <= ncol(pip))
{
end = as.integer(i+as.integer(as.scalar(hp[1,i])))
mat = hp[1, i+1:end]
i = end + 1
ls = append(ls, mat)
k = k + 1
}
clone_X_train = X_train
clone_X_test = X_test
X_train = replace(target = X_train, pattern = NaN, replacement = 0)
X_test = replace(target = X_test, pattern = NaN, replacement = 0)
# classify without cleaning
betas = multiLogReg(X=X_train, Y=y_train, icpt=2, tol=1e-9, reg=0.00001, maxi=50,
maxii=50, verbose=FALSE)
[d_prob, d_yhat, d_accuracy] = multiLogRegPredict(X_test, betas, y_test, FALSE)
[confusionCount_d, confusionAVG_d] = confusionMatrix(P=d_yhat, Y=y_test)
print("dirty confusion matrix \n"+toString(confusionCount_d))
X_train = clone_X_train
X_test = clone_X_test
# clean using best pipeline
[X_train_clean, Y_train_clean] = executePipeline(pip, X_train, y_train, cmask, schema, ls, FALSE)
[X_test_clean, Y_test_clean] = executePipeline(pip, X_test, y_test, cmask, schema, ls, FALSE)
while(FALSE){}
# classify after cleaning
betas = multiLogReg(X=X_train_clean, Y=Y_train_clean, icpt=2, tol=1e-9, reg=0.00001, maxi=50,
maxii=50, verbose=FALSE)
[c_prob, c_yhat, c_accuracy] = multiLogRegPredict(X_test_clean, betas, Y_test_clean, FALSE)
[confusionCount_c, confusionAVG_c] = confusionMatrix(P=c_yhat, Y=Y_test_clean)
print("accuracy of dirty accuracy "+d_accuracy)
print("accuracy of val accuracy "+valAcc)
print("accuracy of test accuracy "+c_accuracy)
print("clean confusion matrix \n"+toString(confusionCount_c))
result = cbind(as.matrix(d_accuracy), as.matrix(c_accuracy))
}
# stratified sampling
doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio)
return (Matrix[Double] eX, Matrix[Double] eY)
{
MIN_SAMPLE = 10000
sampled = floor(nrow(eX) * ratio)
sample = ifelse(sampled > MIN_SAMPLE, TRUE, FALSE)
if(sample)
{
XY = order(target = cbind(eY, eX), by = 1, decreasing=FALSE, index.return=FALSE)
# get the class count
classes = table(eY, 1)
print("classes")
print(toString(classes))
while(FALSE){}
start_class = 1
out_s = 1
out_e = 0
end_class = 0
out = matrix(0, sampled, ncol(XY))
classes_ratio = floor(classes*ratio)
print("class ratio "+toString(classes_ratio))
for(i in 1:nrow(classes))
{
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
out_e = out_e + as.scalar(classes_ratio[i])
out[out_s:out_e, ] = class_t[1:as.scalar(classes_ratio[i]), ]
out_s = out_e + 1
start_class = end_class + 1
}
out = removeEmpty(target = out, margin = "rows")
eY = out[, 1]
eX = out[, 2:ncol(out)]
}
}
# incomplete implementation of automatic logical pipelines
generateLogical = function(Matrix[Double] X, Matrix[Double] Y)
{
logical = as.frame("")
# get the stats
no_of_mv = sum(is.na(X))
X = replace(target= X, pattern = NaN, replacement = 0)
colMin = colMins(X)
colMax = colMaxs(X)
colMean = colMeans(X)
colSd = colSds(X)
count3sdplus = sum(X > (colMean + 3*colSd ))
count3sdminus = sum(X < (colMean - 3*colSd ))
outliers = count3sdplus + count3sdminus
ctab = table(Y, 1)
minCatPer = min(ctab) / nrow(ctab)
maxCat = max(ctab) / nrow(ctab)
mv_to_data_ratio = no_of_mv/(nrow(X) * ncol(X))
out_to_data_ratio = outliers/ (nrow(X) * ncol(X))
if(mv_to_data_ratio > 0.1)
logical = cbind(logical, as.frame("MVI"))
if(out_to_data_ratio > 0.1)
logical = cbind(logical, as.frame("OTLR"))
if((maxCat - minCatPer) > 0.3)
logical = cbind(logical, as.frame("CI"))
}
# stratified splitting
getDataSplits = function(Matrix[Double] X, Matrix[Double] Y, Double splitRatio, Boolean verbose)
return (Matrix[Double] X_train, Matrix[Double] y_train, Matrix[Double] X_test,
Matrix[Double] y_test)
{
XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE)
# get the class count
classes = table(Y, 1)
split = floor(nrow(X) * splitRatio)
start_class = 1
train_row_s = 1
test_row_s = 1
train_row_e = 0
test_row_e = 0
end_class = 0
outTrain = matrix(0, split+nrow(classes), ncol(XY))
outTest = matrix(0, (nrow(X) - split)+nrow(classes), ncol(XY))
classes_ratio_train = floor(classes*splitRatio)
classes_ratio_test = classes - classes_ratio_train
if(verbose) {
print("rows "+nrow(X))
print("classes \n"+toString(classes))
print("train ratio \n"+toString(classes_ratio_train))
print("test ratio \n"+toString(classes_ratio_test))
}
for(i in 1:nrow(classes))
{
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
train_row_e = train_row_e + as.scalar(classes_ratio_train[i])
test_row_e = test_row_e + as.scalar(classes_ratio_test[i])
outTrain[train_row_s:train_row_e, ] = class_t[1:as.scalar(classes_ratio_train[i]), ]
outTest[test_row_s:test_row_e, ] = class_t[as.scalar(classes_ratio_train[i])+1:nrow(class_t), ]
train_row_s = train_row_e + 1
test_row_s = test_row_e + 1
start_class = end_class + 1
}
outTrain = removeEmpty(target = outTrain, margin = "rows")
outTest = removeEmpty(target = outTest, margin = "rows")
y_train = outTrain[, 1]
X_train = outTrain[, 2:ncol(outTrain)]
y_test = outTest[, 1]
X_test = outTest[, 2:ncol(outTest)]
}
# entropy calculation for finding class imbalance
getBalanceScore = function(Matrix[Double] Y)
return (Matrix[Double] isBalanced)
{
# get count of instances in each class
k = table(Y, 1)
n = nrow(Y)
# compute Shannon entropy for i to k H = − ∑ ci/n * log(ci/n)
H = -sum((k/n) * log(k/n))
# Balance = H / log(k) return 0 for unbalance data and 1 for balanced data
isBalanced = H / log(k)
}