blob: 307a622470ba143c29ee00e503611b9a0c69d58d [file] [log] [blame]
#-------------------------------------------------------------
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Double] X_val,
Matrix[Double] Y_val, Matrix[Double] mask, Frame[Unknown] schema, Frame[Unknown] lp,
Frame[Unknown] primitives, Frame[Unknown] param, Integer k = 3, Double testAccuracy = 0.8,
Boolean isWeighted, Integer R=50, Boolean verbose = TRUE)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy)
{
print("null in data "+sum(is.na(X_train)))
# initialize output variables
hparam = list()
pipeline = list()
# initialize bandit variables
# variable names follow publication where algorithm is introduced
eta = 2 # the halving ratio is fixed to 2
s_max = floor(log(R,eta));
B = (s_max + 1) * R;
for(s in s_max:0) {
bracket_hp = list()
bracket_pipel = list()
n = ceil(floor(B/R/(s+1)) * eta^s);
r = R * eta^(-s);
configurations = get_physical_configurations(lp, n, primitives)
if(verbose)
print("n "+ n +"\n R "+ R +"\n s_max "+ s_max +"\n B "+ B +"\n n "+ n +"\n r "+ r)
for( i in 0:s ) {
# successive halving
n_i = as.integer(floor(n * eta^(-i)));
r_i = as.integer(floor(r * eta^i));
if(verbose) {
print("no of configurations ---------"+n_i)
print("no of resources --------------"+r_i)
print("iteration ---------------------"+i)
}
configurations = configurations[1:n_i, ]
[a,b] = run_with_hyperparam(configurations, r_i, X_train, Y_train, X_val, Y_val, mask,
schema, param, isWeighted, verbose)
# sort the pipelines by order of accuracy decreasing
a = frameSort(a)
b = order(target = b, by = 1, decreasing=TRUE, index.return=FALSE)
rowIndex = ifelse(nrow(a) > k, k, nrow(a))
# maintain the brackets results
bracket_pipel = append(bracket_pipel, a[1:rowIndex,])
bracket_hp = append(bracket_hp, b[1:rowIndex, ])
print("inside brackets ")
print(toString(bracket_pipel))
print("------------------")
print(toString(bracket_hp))
while(FALSE){}
}
# keep the best k results for each bracket
[bracket_bestPipeline, bracket_bestHyperparams] = extractTopK(bracket_pipel,
bracket_hp, testAccuracy, k)
pipeline = append(pipeline, bracket_bestPipeline)
hparam = append(hparam, bracket_bestHyperparams)
}
print("after all brackets ")
while(FALSE){}
print(toString(pipeline))
print("------------------")
print(toString(hparam))
while(FALSE){}
# extract best top k from all iterations
[bestPipeline, bestHyperparams] = extractTopK(pipeline, hparam, testAccuracy, k)
bestAccuracy = as.matrix(bestPipeline[, 1])
bestPipeline = bestPipeline[,2:ncol(bestPipeline)]
bestHyperparams = bestHyperparams[,2:ncol(bestHyperparams)]
if(verbose) {
print("best pipeline"+ toString(bestPipeline))
print("best hyper-parameters \n"+ toString(bestHyperparams))
print("best accuracy \n"+ toString(bestAccuracy))
print("dirty accuracy "+testAccuracy)
}
}
# this method will extract the physical pipelines for a given logical pipelines
get_physical_configurations = function(Frame[String] logical, Scalar[int] n,
Frame[Unknown] primitives)
return(Frame[String] physical)
{
# load the primitives
physical = as.frame("NaN")
outliers = primitives[,1]
mvi = primitives[,2]
noise = primitives[,3]
ci = primitives[,4]
dim = primitives[,5]
operator = as.frame(matrix(0,nrow(outliers),1)) #combine all logical primitives
for(j in 1:ncol(logical))
{
# extract the physical primitives
if(as.scalar(logical[1,j]) == "OTLR")
operator = cbind(operator, outliers);
else if(as.scalar(logical[1,j]) == "MVI")
operator = cbind(operator, mvi);
else if(as.scalar(logical[1,j]) == "NR")
operator = cbind(operator, noise);
else if(as.scalar(logical[1,j]) == "CI")
operator = cbind(operator, ci);
else if(as.scalar(logical[1,j]) == "DIM")
operator = cbind(operator, dim);
}
opt = operator[,2:ncol(operator)]
idx = seq(1, ncol(opt))
# get the indexes of columns for recode transformation
index = vectorToCsv(idx)
# recode logical pipelines for easy handling
jspecR = "{ids:true, recode:["+index+"]}";
[X, M] = transformencode(target=opt, spec=jspecR);
X = replace(target= X, pattern = NaN, replacement = 0)
paramLens = matrix(0, ncol(logical), 1);
for( j in 1:ncol(logical)) {
vect = removeEmpty(target = X[,j], margin = "rows");
paramLens[j,1] = nrow(vect);
}
paramVals = matrix(0, ncol(logical), max(paramLens));
for( j in 1:ncol(logical) ) {
vect = removeEmpty(target = X[,j], margin = "rows");
paramVals[j,1:nrow(vect)] = t(vect);
}
cumLens = rev(cumprod(rev(paramLens))/rev(paramLens));
numConfigs = n;
# materialize hyper-parameter combinations
HP = matrix(0, numConfigs, ncol(logical));
parfor( i in 1:nrow(HP) ) {
for( j in 1:ncol(logical) )
HP[i,j] = paramVals[j,as.scalar(((i-1)/cumLens[j,1])%%paramLens[j,1]+1)];
}
physical = transformdecode(target=HP, spec=jspecR, meta=M);
}
# this method will call the execute pipelines with their hyper-parameters
run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double] X, Matrix[Double] Y,
Matrix[Double] X_val, Matrix[Double] Y_val, Matrix[Double] mask, Frame[Unknown] schema,
Frame[Unknown] param, Boolean isWeighted, Boolean verbose)
return (Frame[Unknown] output_operator, Matrix[Double] output_hyperparam)
{
output_hyperparam = matrix(-1, 1, 1)
output_operator = as.frame("")
clone_X = X
clone_x_val = X_val
clone_Y = Y
clone_y_val = Y_val
for(i in 1:nrow(ph_pip))
{
# execute configurations with r resources
for(r in 1:r_i)
{
tmp_hp = matrix(0, 1, 1)
tmp_op = as.frame("")
hp = getHyperparam(ph_pip[i,], param)
[X, Y] = executePipeline(ph_pip[i], X, Y, mask, schema, hp, FALSE)
[X_val, Y_val] = executePipeline(ph_pip[i], X_val, Y_val, mask, schema, hp, FALSE)
accuracy = fclassify(X, Y, X_val, Y_val, isWeighted)
hp_vec = listToVector(hp, FALSE)
tmp_hp = cbind(cbind(as.matrix(accuracy), hp_vec), tmp_hp)
tmp_op = cbind(cbind(as.frame(accuracy), ph_pip[i]), tmp_op)
if(as.scalar(output_hyperparam[1,1]) == -1 ) {
output_hyperparam = tmp_hp
output_operator = tmp_op
}
else {
if(ncol(tmp_hp) < ncol(output_hyperparam))
tmp_hp = cbind(tmp_hp, matrix(0,1,ncol(output_hyperparam) - ncol(tmp_hp)))
else if(ncol(tmp_hp) > ncol(output_hyperparam))
output_hyperparam = cbind(output_hyperparam, matrix(0,nrow(output_hyperparam),
ncol(tmp_hp) - ncol(output_hyperparam) ))
output_hyperparam = rbind(output_hyperparam, tmp_hp)
output_operator = rbind(output_operator, tmp_op)
}
X = clone_X
X_val = clone_x_val
Y = clone_Y
Y_val = clone_y_val
while(FALSE){}
}
}
output_hyperparam = output_hyperparam[, 1:ncol(output_hyperparam) - 1]
output_operator = output_operator[, 1:ncol(output_operator) - 1]
}
# extract the hyper-parameters for pipelines
getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList)
return (List[Unknown] paramList)
{
# load the hyper-parameters values
paramList = list()
for(i in 1:ncol(pipeline)) {
op = as.scalar(pipeline[1,i])
hasParam = map(hpList[,1], "x->x.contains(\""+op+"\")")
m_hasParam = matrix(0, nrow(hasParam), 1)
# convert the boolean vector to 0/1 matrix representation
for(h in 1:nrow(hasParam))
m_hasParam[h] = ifelse(as.scalar(hasParam[h,1]) == "true",1,0)
# compute the relevant index
index = m_hasParam * seq(1, nrow(m_hasParam))
index = as.scalar(removeEmpty(target = index, margin = "rows"))
no_of_param = as.integer(as.scalar(hpList[index, 2]))
# extract hasY and verbose flags
attachY = as.matrix(hpList[index, 3])
isVerbose = as.matrix(hpList[index, 4])
if(no_of_param > 0) {
start = 5
t = 5
OpParam = matrix(0, 1, no_of_param)
for(j in 1:no_of_param) {
type = as.scalar(hpList[index, t])
paramValIndex = (no_of_param) + start
minVal = as.scalar(hpList[index, paramValIndex])
maxVal = as.scalar(hpList[index, paramValIndex + 1])
[minVal, maxVal] = verifyHp(i, pipeline, minVal, maxVal, j)
if(type == "FP") {
val = as.scalar(rand(rows=1, cols=1, min=minVal,
max=maxVal, pdf="uniform"));
OpParam[1, j] = val
}
else if(type == "INT") {
# val = ifelse(minVal == maxVal , minVal, as.scalar(sample(maxVal, 1)));
val = round(as.scalar(rand(rows=1, cols=1, min=minVal,
max=maxVal, pdf="uniform")));
OpParam[1, j] = val
}
else if(type == "BOOL") {
s = as.scalar(sample(2,1))
b = as.integer(s-1)
OpParam[1, j] = b
}
else {
# TODO handle string set something like {,,}
print("invalid data type")
}
start = start + 2
t = t + 1
}
OpParam = cbind(OpParam, attachY, isVerbose)
}
else {
OpParam = attachY
OpParam = cbind(OpParam, isVerbose)
}
while(FALSE){}
paramList = append(paramList, OpParam)
}
}
listToVector = function(List[Unknown] hp, Boolean verbose)
return (Matrix[Double] hp_vec)
{
hp_vec = matrix(0,1,1)
len = length(hp)
for(k in 1:len) {
mat = as.matrix(hp[k])
hpy = cbind(as.matrix(ncol(mat)), mat)
hp_vec = cbind(hp_vec, hpy)
}
hp_vec = hp_vec[1, 2:ncol(hp_vec)]
}
fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] X_val,
Matrix[Double] y_val, Boolean isWeighted)
return (Double accuracy)
{
K = 10
if(max(Y) == min(Y)) {
print("Y contains only one class")
accuracy = as.double(0)
}
else
{
print("STARTING "+K+" CROSS VALIDATIONS")
# do the k = 3 cross validations
accuracyMatrix = crossV(rbind(X, X_val), rbind(Y, y_val), K, isWeighted)
acc = colMeans(accuracyMatrix)
accuracy = as.scalar(acc[1,1])
print("validation accuracy "+accuracy)
}
}
crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Boolean isWeighted)
return (Matrix[Double] accuracyMatrix)
{
#create empty lists
dataset_X = list(); #empty list
dataset_y = list();
fs = ceil(nrow(X)/k);
off = fs - 1;
#divide X, y into lists of k matrices
for (i in seq(1, k)) {
dataset_X = append(dataset_X, X[i*fs-off : min(i*fs, nrow(X)),]);
dataset_y = append(dataset_y, y[i*fs-off : min(i*fs, nrow(y)),]);
}
accuracyMatrix = matrix(0, k, 2)
#keep one fold for testing in each iteration
for (i in seq(1, k)) {
[tmpX, testX] = remove(dataset_X, i);
[tmpy, testy] = remove(dataset_y, i);
trainX = rbind(tmpX);
trainy = rbind(tmpy);
trainX = trainX[,1:ncol(X)] # TODO improve list size propagation
testX = as.matrix(testX)
testy = as.matrix(testy)
beta = multiLogReg(X=trainX, Y=trainy, icpt=2, reg=0.00001, tol=1e-9, maxi=50,
maxii= 50, verbose=FALSE);
[prob, yhat, a] = multiLogRegPredict(testX, beta, testy, FALSE)
accuracy = getAccuracy(testy, yhat, isWeighted)
accuracyMatrix[i, 1] = accuracy
}
}
# extract the top k pipelines
extractTopK = function(List[Unknown] pipeline, List[Unknown] hyperparam,
Double testAccuracy, Integer k)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams)
{
len = length(pipeline)
print("length "+len)
# process the pipelines
bestPipeline = as.frame(pipeline[1])
pipelineLength = ncol(bestPipeline)
if(len > 1)
{
for(i in 2:length(pipeline))
{
right = as.frame(pipeline[i, 1:pipelineLength])
bestPipeline = rbind(bestPipeline, right )
}
}
# process the hyper-parameters
pipLen = matrix(0, length(hyperparam), 1)
pipWidth = matrix(0, length(hyperparam), 1)
for(i in 1:length(hyperparam))
{
mat = as.matrix(hyperparam[i])
pipLen[i] = ncol(mat)
pipWidth[i] = nrow(mat)
}
rowLen = cumsum(pipWidth)
bestHyperparams = matrix(0, max(rowLen), max(pipLen))
start = 1
for(i in 1: length(hyperparam))
{
matSep = as.scalar(rowLen[i])
vect = as.matrix(hyperparam[i])
bestHyperparams[start:matSep, 1:ncol(vect)] = vect
start = matSep + 1
}
mask = (bestHyperparams[, 1] < testAccuracy) == 0
bestPipeline = frameRmEmpty(bestPipeline, mask)
bestHyperparams = removeEmpty(target = bestHyperparams, margin = "rows", select = mask)
bestPipeline = frameSort(bestPipeline)
bestHyperparams = order(target = bestHyperparams, by = 1, decreasing=TRUE, index.return=FALSE)
rowIndex = ifelse(nrow(bestPipeline) > k, k, nrow(bestPipeline))
bestPipeline = bestPipeline[1:rowIndex,]
bestHyperparams = bestHyperparams[1:rowIndex,]
}
# remove empty wrapper for frames
frameRmEmpty = function(Frame[Unknown] frameblock, Matrix[Double] selectMatrix)
return (Frame[Unknown] frameblock)
{
idx = seq(1, ncol(frameblock))
# get the indexes of columns for recode transformation
index = vectorToCsv(idx)
# recode logical pipelines for easy handling
jspecR = "{ids:true, recode:["+index+"]}";
[X, M] = transformencode(target=frameblock, spec=jspecR);
X = removeEmpty(target = X, margin = "rows", select = selectMatrix)
frameblock = transformdecode(target = X, spec = jspecR, meta = M)
}
# smote wrapper for doing relative over-sampling
SMOTE = function(Matrix[Double] X, Matrix[Double] Y, Boolean verbose)
return (Matrix[Double] XY)
{
XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE)
# get the class count
classes = table(Y, 1)
print("before smote")
print(toString(classes))
start_class = 1
end_class = 0
k = table(Y, 1)
getMax = max(k)
maxKIndex = as.scalar(rowIndexMax(t(k)))
outSet = matrix(0, 0, ncol(XY))
for(i in 1: nrow(k)) {
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
remainingRatio = (round(getMax/nrow(class_t)) - 1) * 100
if((i != maxKIndex)) {
# TODO implement SMOTE-NC for categorical data oversampling
synthesized = smote(class_t, remainingRatio, 1, FALSE)
outSet = rbind(outSet, synthesized)
if(verbose) {
print("max value: "+getMax)
print("values of i: "+i)
print("remaining ratio: "+remainingRatio)
}
}
start_class = end_class + 1
}
XY = rbind(XY, synthesized)
Y = XY[, 1]
X = XY[, 2:ncol(XY)]
XY = cbind(X,Y)
classes = table(Y, 1)
print("after smote")
print(toString(classes))
}
# constraints over hyper parameters
verifyHp = function(Integer index, Frame[Unknown] pip, Double minVal, Double maxVal, Integer paraNo)
return (Double minVal, Double maxVal) {
op = as.scalar(pip[1,index])
# 1. if next op is pca then current op should not leave NaNs in data
# 2. if next op is mice then current op should not replace NaNs with zeros
if((op == "outlierBySd" | op == "outlierByIQR") & index < ncol(pip) & paraNo == 2)
{
nextOp = as.scalar(pip[1, index + 1])
if(nextOp == "pca" | nextOp == "abstain" | nextOp == "SMOTE")
{
maxVal = 1.0
}
if(nextOp == "mice")
{
minVal = 2.0
}
}
# print("now min and max val ")
# print(minVal+" "+maxVal)
}
# smote_nc = function(Matrix[Double] X, Integer s = 200, Matrix[Double] mask, Integer k = 1, Boolean verbose = FALSE)
# return (Matrix[Double] Y) {
# if(s < 100 | (s%%100) != 0)
# {
# print("the number of samples should be an integral multiple of 100. Setting s = 100")
# s = 100
# }
# if(k < 1) {
# print("k should not be less than 1. Setting k value to default k = 1.")
# k = 1
# }
# # matrix to keep the index of KNN for each minority sample
# knn_index = matrix(0,k,nrow(X))
# # find nearest neighbour
# for(i in 1:nrow(X))
# {
# knn = nn(X, X[i, ], k)
# knn_index[, i] = knn
# }
# # number of synthetic samples from each minority class sample
# iter = 0
# iterLim = (s/100)
# # matrix to store synthetic samples
# synthetic_samples = matrix(0, iterLim*ncol(knn_index), ncol(X))
# # shuffle the nn indexes
# #rand_index = ifelse(k < iterLim, sample(k, iterLim, TRUE, 42), sample(k, iterLim, 42))
# if (k < iterLim)
# rand_index = sample(k, iterLim, TRUE, 42);
# else
# rand_index = sample(k, iterLim, 42);
# while(iter < iterLim)
# {
# # pick the random NN
# knn_sample = knn_index[as.scalar(rand_index[iter+1]),]
# # generate sample
# for(i in 1:ncol(knn_index))
# {
# index = as.scalar(knn_sample[1,i])
# X_diff = X[index,] - X[i, ]
# gap = as.scalar(Rand(rows=1, cols=1, min=0, max=1, seed = 42))
# X_sys = X[i, ] + (gap*X_diff)
# synthetic_samples[iter*ncol(knn_index)+i,] = X_sys;
# }
# iter = iter + 1
# }
# Y = synthetic_samples
# if(verbose)
# print(nrow(Y)+ " synthesized samples generated.")
# }
# nn = function(Matrix[Double] X, Matrix[Double] instance, Integer k )
# return (Matrix[Double] knn_)
# {
# if(nrow(X) < k)
# stop("can not pick "+k+" nearest neighbours from "+nrow(X)+" total instances")
# # compute the euclidean distance
# diff = X - instance
# square_diff = diff^2
# distance = sqrt(rowSums(square_diff))
# sort_dist = order(target = distance, by = 1, decreasing= FALSE, index.return = TRUE)
# knn_ = sort_dist[2:k+1,]
# }
downSample = function(Matrix[Double] X, matrix[Double] Y)
return (Matrix[Double] XY)
{
# find the class distribution
classes = table(Y, 1)
XY = order(target = cbind(X,Y), by = ncol(X), decreasing = FALSE, index.return = FALSE)
# take minimum class out
minRecords = min(classes)
start_class = 1
out_s = 1
out_e = 0
end_class = 0
out = matrix(0, minRecords * nrow(classes), ncol(XY))
for(i in 1:nrow(classes))
{
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
out_e = out_e + i * minRecords
out[out_s:out_e, ] = class_t[1:minRecords, ]
out_s = out_e + 1
start_class = end_class + 1
}
}