| #------------------------------------------------------------- |
| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Double] X_val, |
| Matrix[Double] Y_val, Matrix[Double] mask, Frame[Unknown] schema, Frame[Unknown] lp, |
| Frame[Unknown] primitives, Frame[Unknown] param, Integer k = 3, Double testAccuracy = 0.8, |
| Boolean isWeighted, Integer R=50, Boolean verbose = TRUE) |
| return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy) |
| { |
| print("null in data "+sum(is.na(X_train))) |
| # initialize output variables |
| hparam = list() |
| pipeline = list() |
| |
| # initialize bandit variables |
| # variable names follow publication where algorithm is introduced |
| eta = 2 # the halving ratio is fixed to 2 |
| s_max = floor(log(R,eta)); |
| B = (s_max + 1) * R; |
| |
| for(s in s_max:0) { |
| |
| bracket_hp = list() |
| bracket_pipel = list() |
| |
| n = ceil(floor(B/R/(s+1)) * eta^s); |
| r = R * eta^(-s); |
| configurations = get_physical_configurations(lp, n, primitives) |
| |
| if(verbose) |
| print("n "+ n +"\n R "+ R +"\n s_max "+ s_max +"\n B "+ B +"\n n "+ n +"\n r "+ r) |
| |
| for( i in 0:s ) { |
| # successive halving |
| n_i = as.integer(floor(n * eta^(-i))); |
| r_i = as.integer(floor(r * eta^i)); |
| |
| if(verbose) { |
| print("no of configurations ---------"+n_i) |
| print("no of resources --------------"+r_i) |
| print("iteration ---------------------"+i) |
| } |
| configurations = configurations[1:n_i, ] |
| [a,b] = run_with_hyperparam(configurations, r_i, X_train, Y_train, X_val, Y_val, mask, |
| schema, param, isWeighted, verbose) |
| # sort the pipelines by order of accuracy decreasing |
| a = frameSort(a) |
| b = order(target = b, by = 1, decreasing=TRUE, index.return=FALSE) |
| |
| rowIndex = ifelse(nrow(a) > k, k, nrow(a)) |
| # maintain the brackets results |
| bracket_pipel = append(bracket_pipel, a[1:rowIndex,]) |
| bracket_hp = append(bracket_hp, b[1:rowIndex, ]) |
| print("inside brackets ") |
| print(toString(bracket_pipel)) |
| print("------------------") |
| print(toString(bracket_hp)) |
| while(FALSE){} |
| } |
| |
| |
| # keep the best k results for each bracket |
| [bracket_bestPipeline, bracket_bestHyperparams] = extractTopK(bracket_pipel, |
| bracket_hp, testAccuracy, k) |
| pipeline = append(pipeline, bracket_bestPipeline) |
| hparam = append(hparam, bracket_bestHyperparams) |
| } |
| print("after all brackets ") |
| while(FALSE){} |
| print(toString(pipeline)) |
| print("------------------") |
| print(toString(hparam)) |
| while(FALSE){} |
| # extract best top k from all iterations |
| [bestPipeline, bestHyperparams] = extractTopK(pipeline, hparam, testAccuracy, k) |
| bestAccuracy = as.matrix(bestPipeline[, 1]) |
| bestPipeline = bestPipeline[,2:ncol(bestPipeline)] |
| bestHyperparams = bestHyperparams[,2:ncol(bestHyperparams)] |
| |
| if(verbose) { |
| print("best pipeline"+ toString(bestPipeline)) |
| print("best hyper-parameters \n"+ toString(bestHyperparams)) |
| print("best accuracy \n"+ toString(bestAccuracy)) |
| print("dirty accuracy "+testAccuracy) |
| } |
| } |
| |
| |
| # this method will extract the physical pipelines for a given logical pipelines |
| |
| get_physical_configurations = function(Frame[String] logical, Scalar[int] n, |
| Frame[Unknown] primitives) |
| return(Frame[String] physical) |
| { |
| # load the primitives |
| physical = as.frame("NaN") |
| outliers = primitives[,1] |
| mvi = primitives[,2] |
| noise = primitives[,3] |
| ci = primitives[,4] |
| dim = primitives[,5] |
| |
| |
| operator = as.frame(matrix(0,nrow(outliers),1)) #combine all logical primitives |
| for(j in 1:ncol(logical)) |
| { |
| # extract the physical primitives |
| if(as.scalar(logical[1,j]) == "OTLR") |
| operator = cbind(operator, outliers); |
| else if(as.scalar(logical[1,j]) == "MVI") |
| operator = cbind(operator, mvi); |
| else if(as.scalar(logical[1,j]) == "NR") |
| operator = cbind(operator, noise); |
| else if(as.scalar(logical[1,j]) == "CI") |
| operator = cbind(operator, ci); |
| else if(as.scalar(logical[1,j]) == "DIM") |
| operator = cbind(operator, dim); |
| } |
| opt = operator[,2:ncol(operator)] |
| |
| idx = seq(1, ncol(opt)) |
| # get the indexes of columns for recode transformation |
| index = vectorToCsv(idx) |
| # recode logical pipelines for easy handling |
| jspecR = "{ids:true, recode:["+index+"]}"; |
| [X, M] = transformencode(target=opt, spec=jspecR); |
| X = replace(target= X, pattern = NaN, replacement = 0) |
| |
| paramLens = matrix(0, ncol(logical), 1); |
| for( j in 1:ncol(logical)) { |
| vect = removeEmpty(target = X[,j], margin = "rows"); |
| paramLens[j,1] = nrow(vect); |
| } |
| paramVals = matrix(0, ncol(logical), max(paramLens)); |
| for( j in 1:ncol(logical) ) { |
| vect = removeEmpty(target = X[,j], margin = "rows"); |
| paramVals[j,1:nrow(vect)] = t(vect); |
| } |
| cumLens = rev(cumprod(rev(paramLens))/rev(paramLens)); |
| numConfigs = n; |
| # materialize hyper-parameter combinations |
| HP = matrix(0, numConfigs, ncol(logical)); |
| parfor( i in 1:nrow(HP) ) { |
| for( j in 1:ncol(logical) ) |
| HP[i,j] = paramVals[j,as.scalar(((i-1)/cumLens[j,1])%%paramLens[j,1]+1)]; |
| } |
| |
| physical = transformdecode(target=HP, spec=jspecR, meta=M); |
| } |
| |
| # this method will call the execute pipelines with their hyper-parameters |
| run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double] X, Matrix[Double] Y, |
| Matrix[Double] X_val, Matrix[Double] Y_val, Matrix[Double] mask, Frame[Unknown] schema, |
| Frame[Unknown] param, Boolean isWeighted, Boolean verbose) |
| return (Frame[Unknown] output_operator, Matrix[Double] output_hyperparam) |
| { |
| output_hyperparam = matrix(-1, 1, 1) |
| output_operator = as.frame("") |
| clone_X = X |
| clone_x_val = X_val |
| clone_Y = Y |
| clone_y_val = Y_val |
| for(i in 1:nrow(ph_pip)) |
| { |
| # execute configurations with r resources |
| for(r in 1:r_i) |
| { |
| tmp_hp = matrix(0, 1, 1) |
| tmp_op = as.frame("") |
| |
| hp = getHyperparam(ph_pip[i,], param) |
| [X, Y] = executePipeline(ph_pip[i], X, Y, mask, schema, hp, FALSE) |
| [X_val, Y_val] = executePipeline(ph_pip[i], X_val, Y_val, mask, schema, hp, FALSE) |
| accuracy = fclassify(X, Y, X_val, Y_val, isWeighted) |
| |
| hp_vec = listToVector(hp, FALSE) |
| tmp_hp = cbind(cbind(as.matrix(accuracy), hp_vec), tmp_hp) |
| tmp_op = cbind(cbind(as.frame(accuracy), ph_pip[i]), tmp_op) |
| if(as.scalar(output_hyperparam[1,1]) == -1 ) { |
| output_hyperparam = tmp_hp |
| output_operator = tmp_op |
| } |
| else { |
| if(ncol(tmp_hp) < ncol(output_hyperparam)) |
| tmp_hp = cbind(tmp_hp, matrix(0,1,ncol(output_hyperparam) - ncol(tmp_hp))) |
| else if(ncol(tmp_hp) > ncol(output_hyperparam)) |
| output_hyperparam = cbind(output_hyperparam, matrix(0,nrow(output_hyperparam), |
| ncol(tmp_hp) - ncol(output_hyperparam) )) |
| |
| output_hyperparam = rbind(output_hyperparam, tmp_hp) |
| output_operator = rbind(output_operator, tmp_op) |
| } |
| X = clone_X |
| X_val = clone_x_val |
| Y = clone_Y |
| Y_val = clone_y_val |
| while(FALSE){} |
| } |
| |
| } |
| output_hyperparam = output_hyperparam[, 1:ncol(output_hyperparam) - 1] |
| output_operator = output_operator[, 1:ncol(output_operator) - 1] |
| |
| } |
| |
| # extract the hyper-parameters for pipelines |
| getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList) |
| return (List[Unknown] paramList) |
| { |
| # load the hyper-parameters values |
| paramList = list() |
| for(i in 1:ncol(pipeline)) { |
| op = as.scalar(pipeline[1,i]) |
| hasParam = map(hpList[,1], "x->x.contains(\""+op+"\")") |
| m_hasParam = matrix(0, nrow(hasParam), 1) |
| |
| # convert the boolean vector to 0/1 matrix representation |
| for(h in 1:nrow(hasParam)) |
| m_hasParam[h] = ifelse(as.scalar(hasParam[h,1]) == "true",1,0) |
| # compute the relevant index |
| index = m_hasParam * seq(1, nrow(m_hasParam)) |
| index = as.scalar(removeEmpty(target = index, margin = "rows")) |
| no_of_param = as.integer(as.scalar(hpList[index, 2])) |
| |
| # extract hasY and verbose flags |
| attachY = as.matrix(hpList[index, 3]) |
| isVerbose = as.matrix(hpList[index, 4]) |
| |
| if(no_of_param > 0) { |
| start = 5 |
| t = 5 |
| OpParam = matrix(0, 1, no_of_param) |
| for(j in 1:no_of_param) { |
| type = as.scalar(hpList[index, t]) |
| paramValIndex = (no_of_param) + start |
| minVal = as.scalar(hpList[index, paramValIndex]) |
| maxVal = as.scalar(hpList[index, paramValIndex + 1]) |
| [minVal, maxVal] = verifyHp(i, pipeline, minVal, maxVal, j) |
| if(type == "FP") { |
| val = as.scalar(rand(rows=1, cols=1, min=minVal, |
| max=maxVal, pdf="uniform")); |
| OpParam[1, j] = val |
| } |
| else if(type == "INT") { |
| # val = ifelse(minVal == maxVal , minVal, as.scalar(sample(maxVal, 1))); |
| val = round(as.scalar(rand(rows=1, cols=1, min=minVal, |
| max=maxVal, pdf="uniform"))); |
| OpParam[1, j] = val |
| } |
| else if(type == "BOOL") { |
| s = as.scalar(sample(2,1)) |
| b = as.integer(s-1) |
| OpParam[1, j] = b |
| } |
| else { |
| # TODO handle string set something like {,,} |
| print("invalid data type") |
| } |
| start = start + 2 |
| t = t + 1 |
| } |
| OpParam = cbind(OpParam, attachY, isVerbose) |
| } |
| else { |
| OpParam = attachY |
| OpParam = cbind(OpParam, isVerbose) |
| } |
| while(FALSE){} |
| paramList = append(paramList, OpParam) |
| } |
| } |
| |
| |
| listToVector = function(List[Unknown] hp, Boolean verbose) |
| return (Matrix[Double] hp_vec) |
| { |
| hp_vec = matrix(0,1,1) |
| len = length(hp) |
| for(k in 1:len) { |
| mat = as.matrix(hp[k]) |
| hpy = cbind(as.matrix(ncol(mat)), mat) |
| hp_vec = cbind(hp_vec, hpy) |
| } |
| hp_vec = hp_vec[1, 2:ncol(hp_vec)] |
| } |
| |
| |
| fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] X_val, |
| Matrix[Double] y_val, Boolean isWeighted) |
| return (Double accuracy) |
| { |
| K = 10 |
| if(max(Y) == min(Y)) { |
| print("Y contains only one class") |
| accuracy = as.double(0) |
| } |
| else |
| { |
| print("STARTING "+K+" CROSS VALIDATIONS") |
| # do the k = 3 cross validations |
| accuracyMatrix = crossV(rbind(X, X_val), rbind(Y, y_val), K, isWeighted) |
| acc = colMeans(accuracyMatrix) |
| accuracy = as.scalar(acc[1,1]) |
| print("validation accuracy "+accuracy) |
| } |
| } |
| |
| crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Boolean isWeighted) |
| return (Matrix[Double] accuracyMatrix) |
| { |
| |
| #create empty lists |
| dataset_X = list(); #empty list |
| dataset_y = list(); |
| fs = ceil(nrow(X)/k); |
| off = fs - 1; |
| #divide X, y into lists of k matrices |
| for (i in seq(1, k)) { |
| dataset_X = append(dataset_X, X[i*fs-off : min(i*fs, nrow(X)),]); |
| dataset_y = append(dataset_y, y[i*fs-off : min(i*fs, nrow(y)),]); |
| } |
| |
| accuracyMatrix = matrix(0, k, 2) |
| |
| #keep one fold for testing in each iteration |
| for (i in seq(1, k)) { |
| [tmpX, testX] = remove(dataset_X, i); |
| [tmpy, testy] = remove(dataset_y, i); |
| trainX = rbind(tmpX); |
| trainy = rbind(tmpy); |
| trainX = trainX[,1:ncol(X)] # TODO improve list size propagation |
| testX = as.matrix(testX) |
| testy = as.matrix(testy) |
| beta = multiLogReg(X=trainX, Y=trainy, icpt=2, reg=0.00001, tol=1e-9, maxi=50, |
| maxii= 50, verbose=FALSE); |
| [prob, yhat, a] = multiLogRegPredict(testX, beta, testy, FALSE) |
| |
| accuracy = getAccuracy(testy, yhat, isWeighted) |
| accuracyMatrix[i, 1] = accuracy |
| |
| } |
| } |
| |
| # extract the top k pipelines |
| extractTopK = function(List[Unknown] pipeline, List[Unknown] hyperparam, |
| Double testAccuracy, Integer k) |
| return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams) |
| { |
| len = length(pipeline) |
| print("length "+len) |
| # process the pipelines |
| bestPipeline = as.frame(pipeline[1]) |
| pipelineLength = ncol(bestPipeline) |
| if(len > 1) |
| { |
| for(i in 2:length(pipeline)) |
| { |
| right = as.frame(pipeline[i, 1:pipelineLength]) |
| bestPipeline = rbind(bestPipeline, right ) |
| } |
| } |
| |
| # process the hyper-parameters |
| pipLen = matrix(0, length(hyperparam), 1) |
| pipWidth = matrix(0, length(hyperparam), 1) |
| for(i in 1:length(hyperparam)) |
| { |
| mat = as.matrix(hyperparam[i]) |
| pipLen[i] = ncol(mat) |
| pipWidth[i] = nrow(mat) |
| } |
| rowLen = cumsum(pipWidth) |
| bestHyperparams = matrix(0, max(rowLen), max(pipLen)) |
| |
| start = 1 |
| for(i in 1: length(hyperparam)) |
| { |
| matSep = as.scalar(rowLen[i]) |
| vect = as.matrix(hyperparam[i]) |
| bestHyperparams[start:matSep, 1:ncol(vect)] = vect |
| start = matSep + 1 |
| } |
| |
| mask = (bestHyperparams[, 1] < testAccuracy) == 0 |
| bestPipeline = frameRmEmpty(bestPipeline, mask) |
| bestHyperparams = removeEmpty(target = bestHyperparams, margin = "rows", select = mask) |
| |
| bestPipeline = frameSort(bestPipeline) |
| bestHyperparams = order(target = bestHyperparams, by = 1, decreasing=TRUE, index.return=FALSE) |
| rowIndex = ifelse(nrow(bestPipeline) > k, k, nrow(bestPipeline)) |
| |
| bestPipeline = bestPipeline[1:rowIndex,] |
| bestHyperparams = bestHyperparams[1:rowIndex,] |
| |
| } |
| |
| # remove empty wrapper for frames |
| frameRmEmpty = function(Frame[Unknown] frameblock, Matrix[Double] selectMatrix) |
| return (Frame[Unknown] frameblock) |
| { |
| idx = seq(1, ncol(frameblock)) |
| # get the indexes of columns for recode transformation |
| index = vectorToCsv(idx) |
| # recode logical pipelines for easy handling |
| jspecR = "{ids:true, recode:["+index+"]}"; |
| [X, M] = transformencode(target=frameblock, spec=jspecR); |
| X = removeEmpty(target = X, margin = "rows", select = selectMatrix) |
| frameblock = transformdecode(target = X, spec = jspecR, meta = M) |
| } |
| |
| |
| # smote wrapper for doing relative over-sampling |
| SMOTE = function(Matrix[Double] X, Matrix[Double] Y, Boolean verbose) |
| return (Matrix[Double] XY) |
| { |
| |
| XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE) |
| # get the class count |
| classes = table(Y, 1) |
| print("before smote") |
| print(toString(classes)) |
| start_class = 1 |
| end_class = 0 |
| k = table(Y, 1) |
| getMax = max(k) |
| maxKIndex = as.scalar(rowIndexMax(t(k))) |
| outSet = matrix(0, 0, ncol(XY)) |
| |
| for(i in 1: nrow(k)) { |
| end_class = end_class + as.scalar(classes[i]) |
| class_t = XY[start_class:end_class, ] |
| remainingRatio = (round(getMax/nrow(class_t)) - 1) * 100 |
| if((i != maxKIndex)) { |
| # TODO implement SMOTE-NC for categorical data oversampling |
| synthesized = smote(class_t, remainingRatio, 1, FALSE) |
| outSet = rbind(outSet, synthesized) |
| if(verbose) { |
| print("max value: "+getMax) |
| print("values of i: "+i) |
| print("remaining ratio: "+remainingRatio) |
| } |
| } |
| start_class = end_class + 1 |
| } |
| |
| XY = rbind(XY, synthesized) |
| Y = XY[, 1] |
| X = XY[, 2:ncol(XY)] |
| XY = cbind(X,Y) |
| classes = table(Y, 1) |
| print("after smote") |
| print(toString(classes)) |
| } |
| |
| # constraints over hyper parameters |
| verifyHp = function(Integer index, Frame[Unknown] pip, Double minVal, Double maxVal, Integer paraNo) |
| return (Double minVal, Double maxVal) { |
| op = as.scalar(pip[1,index]) |
| # 1. if next op is pca then current op should not leave NaNs in data |
| # 2. if next op is mice then current op should not replace NaNs with zeros |
| |
| if((op == "outlierBySd" | op == "outlierByIQR") & index < ncol(pip) & paraNo == 2) |
| { |
| nextOp = as.scalar(pip[1, index + 1]) |
| if(nextOp == "pca" | nextOp == "abstain" | nextOp == "SMOTE") |
| { |
| maxVal = 1.0 |
| } |
| if(nextOp == "mice") |
| { |
| minVal = 2.0 |
| } |
| } |
| # print("now min and max val ") |
| # print(minVal+" "+maxVal) |
| |
| } |
| |
| # smote_nc = function(Matrix[Double] X, Integer s = 200, Matrix[Double] mask, Integer k = 1, Boolean verbose = FALSE) |
| # return (Matrix[Double] Y) { |
| |
| # if(s < 100 | (s%%100) != 0) |
| # { |
| # print("the number of samples should be an integral multiple of 100. Setting s = 100") |
| # s = 100 |
| # } |
| |
| # if(k < 1) { |
| # print("k should not be less than 1. Setting k value to default k = 1.") |
| # k = 1 |
| # } |
| |
| # # matrix to keep the index of KNN for each minority sample |
| # knn_index = matrix(0,k,nrow(X)) |
| # # find nearest neighbour |
| # for(i in 1:nrow(X)) |
| # { |
| # knn = nn(X, X[i, ], k) |
| # knn_index[, i] = knn |
| # } |
| |
| # # number of synthetic samples from each minority class sample |
| # iter = 0 |
| # iterLim = (s/100) |
| # # matrix to store synthetic samples |
| # synthetic_samples = matrix(0, iterLim*ncol(knn_index), ncol(X)) |
| |
| # # shuffle the nn indexes |
| # #rand_index = ifelse(k < iterLim, sample(k, iterLim, TRUE, 42), sample(k, iterLim, 42)) |
| # if (k < iterLim) |
| # rand_index = sample(k, iterLim, TRUE, 42); |
| # else |
| # rand_index = sample(k, iterLim, 42); |
| |
| # while(iter < iterLim) |
| # { |
| # # pick the random NN |
| # knn_sample = knn_index[as.scalar(rand_index[iter+1]),] |
| # # generate sample |
| # for(i in 1:ncol(knn_index)) |
| # { |
| # index = as.scalar(knn_sample[1,i]) |
| # X_diff = X[index,] - X[i, ] |
| # gap = as.scalar(Rand(rows=1, cols=1, min=0, max=1, seed = 42)) |
| # X_sys = X[i, ] + (gap*X_diff) |
| # synthetic_samples[iter*ncol(knn_index)+i,] = X_sys; |
| # } |
| # iter = iter + 1 |
| # } |
| |
| # Y = synthetic_samples |
| |
| # if(verbose) |
| # print(nrow(Y)+ " synthesized samples generated.") |
| |
| # } |
| |
| |
| |
| # nn = function(Matrix[Double] X, Matrix[Double] instance, Integer k ) |
| # return (Matrix[Double] knn_) |
| # { |
| # if(nrow(X) < k) |
| # stop("can not pick "+k+" nearest neighbours from "+nrow(X)+" total instances") |
| |
| # # compute the euclidean distance |
| # diff = X - instance |
| # square_diff = diff^2 |
| # distance = sqrt(rowSums(square_diff)) |
| # sort_dist = order(target = distance, by = 1, decreasing= FALSE, index.return = TRUE) |
| # knn_ = sort_dist[2:k+1,] |
| # } |
| |
| downSample = function(Matrix[Double] X, matrix[Double] Y) |
| return (Matrix[Double] XY) |
| { |
| # find the class distribution |
| classes = table(Y, 1) |
| XY = order(target = cbind(X,Y), by = ncol(X), decreasing = FALSE, index.return = FALSE) |
| # take minimum class out |
| minRecords = min(classes) |
| start_class = 1 |
| out_s = 1 |
| out_e = 0 |
| end_class = 0 |
| |
| out = matrix(0, minRecords * nrow(classes), ncol(XY)) |
| |
| for(i in 1:nrow(classes)) |
| { |
| end_class = end_class + as.scalar(classes[i]) |
| class_t = XY[start_class:end_class, ] |
| out_e = out_e + i * minRecords |
| out[out_s:out_e, ] = class_t[1:minRecords, ] |
| out_s = out_e + 1 |
| start_class = end_class + 1 |
| } |
| |
| } |