| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| # In The bandit function the objective is to find an arm that optimizes |
| # a known functional of the unknown arm-reward distributions. |
| # |
| # INPUT: |
| # ----------------------------------------------------------------------------------- |
| # X_train --- |
| # Y_train --- |
| # X_test --- |
| # Y_test --- |
| # metaList --- |
| # evaluationFunc --- |
| # evalFunHp --- |
| # lp --- |
| # primitives --- |
| # params --- |
| # K --- |
| # R --- |
| # baseLineScore --- |
| # cv --- |
| # cvk --- |
| # verbose --- |
| # output --- |
| # ----------------------------------------------------------------------------------- |
| # |
| # OUTPUT: |
| # -------------------------------------- |
| # perf --- |
| # -------------------------------------- |
| |
| m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Double] X_test, Matrix[Double] Y_test, List[Unknown] metaList, |
| String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] lp, Matrix[Double] lpHp, Frame[Unknown] primitives, Frame[Unknown] param, Integer k = 3, |
| Integer R=50, Double baseLineScore, Boolean cv, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean verbose = TRUE) |
| return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy, Frame[String] applyFunc) |
| { |
| print("Starting optimizer") |
| totalPruneCount = 0 |
| FLAG_VARIABLE = 5 |
| pipelines_executed = 0 |
| HYPERPARAM_LENGTH = ((ncol(lp) + 2) * FLAG_VARIABLE * 3) + 1 ## num of col in logical * 5 meat flag vars * max hyperparam per op + 1 accuracy col |
| bestPipeline = frame("", rows=1, cols=1) |
| bestHyperparams = as.matrix(0) |
| bestAccuracy = as.matrix(0) |
| # initialize bandit variables |
| # variable names follow publication where algorithm is introduced |
| eta = 2 # the halving ratio is fixed to 2 |
| s_max = floor(log(R,eta)); |
| # # compute weights for R and then increase/decrease R with respect to importance of configurations |
| |
| weight = matrix(1/s_max , rows=s_max, cols=1) |
| weight = cumsum(weight) |
| # weight = matrix(1, rows=s_max, cols=1) |
| # print("weight matrix: "+toString(weight)) |
| # initialize output variables |
| hparam = matrix(0, rows=k*(s_max), cols=HYPERPARAM_LENGTH) |
| pipeline = matrix(0, rows=k*(s_max), cols=4) |
| endIdx = matrix(k, rows=(s_max), cols=1) |
| endIdx = cumsum(endIdx) |
| startIdx = (endIdx - k) + 1 |
| logicalBest = lp |
| n = ifelse(s_max >= nrow(lp), nrow(lp), n = ceil(nrow(lp)/s_max);) |
| pipelineId = as.frame(seq(1, nrow(lp))) |
| lp = cbind(pipelineId, lp) |
| mainLookup = lp |
| B = (s_max + 1) * R; |
| s_max = s_max - 1 |
| idx = 1 |
| for(s in s_max:0) { |
| |
| # result variables |
| bracket_hp = matrix(0, rows=k*(s+1)+k, cols=HYPERPARAM_LENGTH) |
| bracket_pipel = matrix(0, rows=k*(s+1)+k, cols=4) |
| start=1; end=0; |
| # # compute the number of initial pipelines n |
| r = max(R * as.scalar(weight[((s_max - s) + 1)]) * eta^(-s), 1); |
| configurations = lp[1:(min(n, nrow(lp)))] |
| # append configuration keys for extracting the pipeline later on |
| id = seq(1, nrow(configurations)) |
| configurations = cbind(as.frame(id), configurations) |
| |
| for(i in 0:s) { |
| # successive halving |
| n_i = min(max(as.integer(floor(n * eta^(-i))), 1), nrow(configurations)); |
| r_i = as.integer(floor(r * eta^i)); |
| |
| if(verbose) { |
| print("no of configurations ---------"+n_i) |
| print("no of resources --------------"+r_i) |
| print("iteration ---------------------"+i+" out of "+s) |
| } |
| configurations = configurations[1:n_i, ] |
| pipelines_executed = pipelines_executed + (n_i * r_i) |
| [outPip,outHp, pruneCount] = run_with_hyperparam(ph_pip=configurations, r_i=r_i, X=X_train, Y=Y_train, Xtest=X_test, Ytest=Y_test, metaList=metaList, |
| evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, param=param, cv=cv, cvk=cvk, ref=ref, seed = seed, enablePruning=enablePruning) |
| totalPruneCount = totalPruneCount + pruneCount |
| # sort the pipelines by order of accuracy decreasing |
| IX = order(target = outPip, by = 1, decreasing=TRUE, index.return=TRUE) |
| P = table(seq(1,nrow(IX)), IX, nrow(IX), nrow(outPip)); |
| a = P %*% outPip |
| b = P %*% outHp |
| rowIndex = min(k, nrow(a)) |
| |
| # maintain the brackets results |
| end = end + rowIndex |
| bracket_pipel[start:end, 1:ncol(a)] = a[1:rowIndex,] |
| bracket_hp[start:end, 1:ncol(b)] = b[1:rowIndex,] |
| start = end + 1 |
| |
| # sort the configurations for successive halving |
| avergae_perf = getMaxPerConf(outPip, nrow(configurations)) |
| sortMask = matrix(1, rows=1, cols=ncol(configurations) + 1) |
| sortMask[1,1] = 0 |
| configurations = frameSort(cbind(avergae_perf, configurations), sortMask, TRUE) |
| configurations = configurations[, 2:ncol(configurations)] |
| } |
| if(n < nrow(lp)) |
| lp = lp[n+1:nrow(lp),] |
| bracket_pipel = removeEmpty(target=bracket_pipel, margin="rows") |
| bracket_hp = removeEmpty(target=bracket_hp, margin="rows") |
| # keep the best k results for each bracket |
| [bracket_bestPipeline, bracket_bestHyperparams] = extractBracketWinners(bracket_pipel, bracket_hp, k) |
| # optimize by the features |
| startOut = as.scalar(startIdx[idx]) |
| endOut = min(as.scalar(endIdx[idx]), (startOut + nrow(bracket_bestPipeline) - 1)) |
| pipeline[startOut:endOut, ] = bracket_bestPipeline |
| hparam[startOut:endOut, 1:ncol(bracket_bestHyperparams)] = bracket_bestHyperparams |
| idx = idx + 1 |
| } |
| [bestPipeline, bestHyperparams, bestAccuracy] = extractTopK(pipeline, hparam, baseLineScore, k, mainLookup, lp, lpHp) |
| |
| imp = as.double(as.scalar(bestAccuracy[1, 1])) - as.double(baseLineScore) |
| perf = imp > 0 |
| applyFunc = bestPipeline |
| for(k in 1:nrow(bestPipeline)) |
| { |
| bestPip = removeEmpty(target=bestPipeline[k], margin="cols") |
| applyOp = getParamMeta(bestPip, param) |
| applyFunc[k, 1:ncol(applyOp)] = applyOp |
| } |
| if(verbose) { |
| print("dirty accuracy "+toString(baseLineScore)) |
| print("topk pipelines \n"+toString(bestPipeline)) |
| print("topk hyper params \n"+toString(bestHyperparams)) |
| print("topk scores: \n"+toString(bestAccuracy)) |
| print("evalHp: \n"+toString(evalFunHp)) |
| print("performance improvement "+ imp) |
| print("total physical pipelines to be executed: "+pipelines_executed) |
| print("prune count: "+totalPruneCount) |
| print("actual executed pipelines: "+(pipelines_executed - totalPruneCount)) |
| } |
| } |
| |
| # this method will extract the physical pipelines for a given logical pipelines |
| get_physical_configurations = function(Frame[String] logical, Scalar[int] numConfigs = 10, |
| Frame[Unknown] primitives) |
| return(Frame[String] physical) |
| { |
| # load the primitives |
| physical = as.frame("NaN") |
| ed = primitives[, 1] |
| mvi = primitives[, 2] |
| outliers = primitives[,3] |
| ec = primitives[, 4] |
| scale = primitives[, 5] |
| ci = primitives[, 6] |
| dummy = primitives[,7] |
| dim = primitives[, 8] |
| |
| operator = frame(0, rows=nrow(primitives), cols=ncol(logical)) # combine all logical primitives |
| parfor(j in 1:ncol(logical), check = 0) |
| { |
| # extract the physical primitives |
| if(as.scalar(logical[1,j]) == "ED") |
| operator[, j] = ed; |
| else if(as.scalar(logical[1,j]) == "EC") |
| operator[, j] = ec; |
| else if(as.scalar(logical[1,j]) == "OTLR") |
| operator[, j] = outliers; |
| else if(as.scalar(logical[1,j]) == "MVI") |
| operator[, j] = mvi; |
| else if(as.scalar(logical[1,j]) == "CI") |
| operator[, j] = ci; |
| else if(as.scalar(logical[1,j]) == "DIM") |
| operator[, j] = dim; |
| else if(as.scalar(logical[1,j]) == "DUMMY") |
| operator[, j] = dummy; |
| else if(as.scalar(logical[1,j]) == "SCALE") |
| operator[, j] = scale; |
| else print("invalid operation "+as.scalar(logical[1,j])) |
| } |
| physical = operator |
| } |
| |
| # # this method will call the execute pipelines with their hyper-parameters |
| run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i = 1, Matrix[Double] X, Matrix[Double] Y, |
| Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp, |
| Frame[Unknown] param, Boolean cv = FALSE, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean default = FALSE) |
| return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Integer pruneCount, Matrix[Double] changesByPipMatrix) |
| { |
| # # # TODO there is a partial overlap but it is negligible so we will not rewrite the scripts but lineage based reuse will get rid of it |
| changesByPipMatrix = matrix(0, rows=nrow(ph_pip) * r_i, cols=1) |
| pruneCount = 0 |
| output_hp = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)) * 5 * 3) |
| output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1) |
| output_pipelines = matrix(0, nrow(ph_pip)*r_i, 3) |
| # rows in validation set |
| ids = as.matrix(ph_pip[, 1:2]) |
| ph_pip = ph_pip[, 3:ncol(ph_pip)] |
| inputHpMatrix = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)) * 5 * 3 + 1) |
| # prepare the pipelines and resources |
| allPipelines = frame(0, rows = nrow(ph_pip) * r_i, cols=ncol(ph_pip)) |
| allApplyFunctions = frame(0, rows = nrow(ph_pip) * r_i, cols=ncol(ph_pip)) |
| allIds = matrix(0, rows = nrow(ph_pip) * r_i, cols=2) |
| start = 1 |
| end = 0 |
| i = 1 |
| while(i <= nrow(ph_pip)) |
| { |
| op = removeEmpty(target=ph_pip[i], margin="cols") |
| [hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, seed, enablePruning) |
| tmpFrameApply = frame(0, rows=no_of_res, cols=ncol(applyFunctions)) |
| tmpFrameApply = freplicate(tmpFrameApply, applyFunctions) |
| tmpFramePip = frame(0, rows=no_of_res, cols=ncol(op)) |
| tmpFramePip = freplicate(tmpFramePip, op) |
| tmpIds = matrix(1, rows=no_of_res, cols=2) * ids[i] |
| end = start + (no_of_res - 1) |
| allPipelines[start:end, 1:ncol(op)] = tmpFramePip |
| allApplyFunctions[start:end, 1:ncol(op)] = tmpFrameApply |
| allIds[start:end,] = tmpIds |
| if(no_of_res == 1) { |
| hp = matrix(hp, rows=no_of_res, cols = ncol(hp) * ncol(op)) |
| inputHpMatrix[i, 1:ncol(hp)+1] = cbind(as.matrix(ncol(hp)), hp) |
| i = i + 1 |
| } |
| else |
| { |
| for(r in 1:no_of_res) |
| { |
| indexes = matrix(no_of_res, rows=ncol(op), cols=1) |
| indexes[1, 1] = r |
| indexes = cumsum(indexes) |
| indexes = table(indexes, 1, 1, nrow(hp), 1) |
| hp_matrix = removeEmpty(target = hp, margin="rows", select = indexes) |
| tmp = matrix(hp_matrix, rows=1, cols=nrow(hp_matrix) * ncol(hp_matrix)) |
| tmp = cbind(as.matrix(ncol(tmp)), tmp) |
| inputHpMatrix[i, 1:ncol(tmp)] = tmp |
| i = i + 1 |
| } |
| } |
| start = end + 1 |
| } |
| allPipelines = removeEmpty(target = allPipelines, margin="rows") |
| inputHpMatrix = removeEmpty(target = inputHpMatrix, margin="rows") |
| opPre = allIds[1] |
| accuracy = 0 |
| changesByPip = 0 |
| |
| parfor(i in 1:nrow(allPipelines), check = 0) # opt=CONSTRAINED, mode=REMOTE_SPARK |
| { |
| evalFunOutput = as.matrix(0) |
| # execute configurations with r resources |
| op = removeEmpty(target=allPipelines[i], margin="cols") |
| opNew = allIds[i] |
| hp = inputHpMatrix[i] |
| totalVals = as.scalar(hp[1, 1]) + 1 |
| hp = hp[, 2:totalVals] |
| applyFunctions = allApplyFunctions[i] |
| no_of_res = nrow(hp) |
| # print("PIPELINE EXECUTION START ... "+toString(op)) |
| hpForPruning = matrix(0, rows=1, cols=ncol(op)) |
| changesByOp = matrix(0, rows=1, cols=ncol(op)) |
| metaList2 = metaList; #ensure metaList is no result var |
| metaList2["applyFunc"] = applyFunctions |
| hp_matrix = matrix(hp, rows=ncol(op), cols=ncol(hp)/ncol(op)) #removeEmpty(target = hp, margin="rows", select = indexes) |
| # # check if the pruning could be applied to avoid unnecessary executions |
| pruneSignal = pruningSignal(opPre, opNew, hp_matrix, hpForPruning, changesByOp) |
| opPre = opNew |
| executionSingnal = TRUE #ifelse(enablePruning, pruneSignal, TRUE) |
| ref = ifelse(enablePruning, ref, 0) |
| if(executionSingnal) |
| { |
| t1 = time() |
| if(cv) |
| { |
| pipList = list(ph = op, hp = hp_matrix, flags = no_of_flag_vars) |
| [accuracy, evalHp, hpForPruning, changesByOp, changesByPip] = crossV(X=X, y=Y, cvk=cvk, evalFunHp=evalFunHp, |
| pipList=pipList, metaList=metaList2, hpForPruning=hpForPruning, |
| changesByOp=changesByOp, evalFunc=evaluationFunc, ref=ref) |
| } |
| else |
| { |
| [eXtrain, eYtrain, eXtest, eYtest, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=op, |
| Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList2, hyperParameters=hp_matrix, hpForPruning=hpForPruning, |
| changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE, startInd=1, endInd=ncol(op)) |
| if(max(eYtrain) == min(eYtrain)) |
| print("Y contains only one class") |
| else if(changesByPip < ref) |
| print("prunningAlert 2: not training the model due to minimum changes") |
| else |
| evalFunOutput = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, Xtest=eXtest, Ytest=eYtest, Xorig=as.matrix(0), evalFunHp=evalFunHp)) |
| accuracy = as.scalar(evalFunOutput[1, 1]) |
| } |
| |
| # evalFunOutput = eval(evaluationFunc, argList) |
| matrix_width = as.matrix(nrow(hp_matrix) * ncol(hp_matrix)) |
| hp_vec = inputHpMatrix[i] |
| output_accuracy[i, 1] = accuracy |
| output_hp[i, 1:ncol(hp_vec)] = hp_vec |
| output_pipelines[i, ] = cbind(as.matrix(i), allIds[i,1:2]) |
| } |
| else |
| { |
| pruneCount = pruneCount + 1 |
| print("prunningAlert 1: not executing instance : "+i+" pruneCount"+pruneCount) |
| } |
| changesByPipMatrix[i] = changesByPip |
| } |
| sel = rowSums(output_hp) > 0 |
| output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows", select = sel) |
| output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines), margin="rows", select = sel) |
| changesByPipMatrix = removeEmpty(target=changesByPipMatrix, margin="rows", select = sel) |
| } |
| |
| # extract the hyper-parameters for pipelines |
| getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Integer no_of_res, Boolean default, Integer seed = -1, Boolean enablePruning) |
| return (Matrix[Double] paramMatrix, Frame[Unknown] applyFunc, Integer no_of_res, Integer NUM_META_FLAGS) |
| { |
| allParam = 0; |
| NUM_META_FLAGS = 5 |
| NUM_DEFAULT_VALUES = 4 |
| |
| # load the hyper-parameters values |
| paramList = list() |
| # store the row indexes of the operator matches |
| [applyFunc, indexes, paramCount] = getParamMeta(pipeline, hpList) |
| |
| hpList = hpList[, 3:ncol(hpList)] |
| DEFAULT_INDEX = 7 |
| START_INDEX = 11 # value from where the hyper-params starts after skipping meta flags |
| |
| # if there are no hyper-parameters than change the values of resources |
| # so that the pipeline is only executed once and no resource are wasted, saving looping |
| no_of_res = ifelse(sum(paramCount) > 0, no_of_res, 1) |
| # the below matrix stores the different combinations of hyper-parameter value for each pipeline |
| # if the resource value is greater than zero this means for 1 pipeline it will store r rows where each row store set |
| # of hyperparameter values for ith pipeline. If resource value rv = 10 and ncol(pip) = 3 then the output matrix will have |
| # 10*3= 30 rows and 1:10 hyper-paramters for i-the pipeline 11:20 for (i+1)-th pipeline and so on |
| # this matrix stores no. of hps, values of hps, and flags |
| paramMatrix = matrix(0, rows=ncol(pipeline)*no_of_res, cols=max(paramCount)+NUM_META_FLAGS+1) |
| |
| parfor(i in 1:ncol(pipeline), check=0) { |
| op = as.scalar(pipeline[1, i]) |
| index = as.scalar(indexes[i]) |
| no_of_param = as.integer(as.scalar(paramCount[i])) |
| # extract hasY and verbose flags |
| attachMask = matrix(as.scalar(hpList[index, 2]), rows=no_of_res, cols=1) |
| attachFD = matrix(as.scalar(hpList[index, 3]), rows=no_of_res, cols=1) |
| attachY = matrix(as.scalar(hpList[index, 4]), rows=no_of_res, cols=1) |
| isVerbose = matrix(as.scalar(hpList[index, 5]), rows=no_of_res, cols=1) |
| dataFlag = matrix(as.scalar(hpList[index, 6]), rows=no_of_res, cols=1) |
| if(no_of_param > 0) { |
| paramIdx = START_INDEX |
| typeIdx = START_INDEX |
| OpParam = matrix(0, rows=no_of_res, cols=max(paramCount)) |
| if(default | no_of_res == 1) { |
| OpParam[1, 1:no_of_param] = as.matrix(hpList[index, DEFAULT_INDEX:DEFAULT_INDEX+(no_of_param - 1)]) |
| } |
| else { |
| for(j in 1:no_of_param) { |
| type = as.scalar(hpList[index, typeIdx]) |
| paramValIndex = (no_of_param) + paramIdx |
| minVal = as.scalar(hpList[index, paramValIndex]) |
| maxVal = as.scalar(hpList[index, paramValIndex + 1]) |
| if(type == "FP") { |
| val = rand(rows=no_of_res, cols=1, min=minVal, max=maxVal, pdf="uniform", seed=seed); |
| OpParam[, j] = val; |
| } |
| else if(type == "INT") { |
| if(as.integer(maxVal) > no_of_res) |
| val = sample(as.integer(maxVal), no_of_res, FALSE, seed) |
| else |
| val = sample(as.integer(maxVal), no_of_res, TRUE, seed) |
| less_than_min = val < as.integer(minVal); |
| val = (less_than_min * minVal) + val; |
| OpParam[, j] = val; |
| } |
| else if(type == "BOOL") { |
| if(maxVal == 1) { |
| s = sample(2, no_of_res, TRUE, seed); |
| b = s - 1; |
| OpParam[, j] = b; |
| } |
| else |
| OpParam[, j] = matrix(0, rows=no_of_res, cols=1) |
| } |
| else |
| print("invalid data type") # TODO handle string set something like {,,} |
| |
| paramIdx = paramIdx + 2 |
| typeIdx = typeIdx + 1 |
| } |
| } |
| if((op == "outlierBySd" | op == "outlierByIQR" | op == "imputeByFd") & no_of_res > 1 & enablePruning) |
| OpParam = order(target=OpParam, by = 1, decreasing = FALSE, index.return = FALSE) |
| # hyper-parameter vector contains no. of hp, values of hp, and flag values |
| OpParam = cbind(matrix(no_of_param, rows=nrow(OpParam), cols=1),OpParam, attachMask, |
| attachFD, attachY, isVerbose, dataFlag) |
| } |
| else { |
| # no hyper-parameters, so create a dummy matrix of zeros so flags are always aligned |
| dummy = matrix(0, rows=no_of_res, cols=max(paramCount)+1) |
| OpParam = cbind(dummy, attachMask, attachFD, attachY) |
| OpParam = cbind(OpParam, isVerbose, dataFlag) |
| } |
| paramMatrix[((i-1)*no_of_res)+1:i*no_of_res, 1:ncol(OpParam)] = OpParam |
| } |
| } |
| |
| |
| # extract the top k pipelines as a final result after deduplication and sorting |
| extractTopK = function(Matrix[Double] pipeline, Matrix[Double] hyperparam, |
| Double baseLineScore, Integer k, Frame[Unknown] mainLookup, Frame[Unknown] bestLogical, Matrix[Double] lpHp) |
| return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy) |
| { |
| |
| bestLogical = as.matrix(bestLogical[, 1]) |
| pipeline = pipeline[, ncol(pipeline)] |
| blN = nrow(bestLogical) |
| lpHp = lpHp[nrow(lpHp)-blN+1:nrow(lpHp),] |
| IX = order(target = rbind(lpHp, hyperparam), by = 1, decreasing=TRUE, index.return=TRUE) |
| P = table(seq(1,nrow(IX)), IX, nrow(IX), nrow(rbind(lpHp, hyperparam))); |
| hyperparam = P %*% rbind(lpHp, hyperparam) |
| pipeline = P %*% rbind(bestLogical, pipeline) |
| |
| # remove the row with accuracy less than test accuracy |
| mask = (hyperparam[, 1] < baseLineScore) == 0 |
| if(sum(mask) == 0) |
| mask[1, 1] = 1 |
| hyperparam = removeEmpty(target = hyperparam, margin = "rows", select = mask) |
| pipeline = removeEmpty(target = pipeline, margin = "rows", select = mask) |
| |
| rowIndex = min(nrow(hyperparam), k) |
| # select the top k |
| bestAccuracy = hyperparam[1:rowIndex, 1] |
| bestHyperparams = hyperparam[1:rowIndex, 2:ncol(hyperparam)] |
| pipeline = pipeline[1:rowIndex] |
| # # # lookup for the pipelines |
| pipCode = pipeline[, 1] |
| |
| bestPipeline = frame(data="0", rows=nrow(pipeline), cols=ncol(mainLookup)) |
| parfor(i in 1: nrow(pipeline)) { |
| index = as.scalar(pipCode[i]) |
| bestPipeline[i] = mainLookup[index] |
| } |
| |
| bestPipeline = bestPipeline[, 2:ncol(bestPipeline)] |
| |
| } |
| |
| # extract the top k pipelines for each bracket, the intermediate results |
| extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperparam, Integer k) |
| return (Matrix[Double] bestPipeline, Matrix[Double] bestHyperparams) |
| { |
| # bestPipeline = frameSort(bestPipeline) |
| hyperparam = order(target = hyperparam, by = 1, decreasing=TRUE, index.return=FALSE) |
| pipeline = order(target = pipeline, by = 1, decreasing=TRUE, index.return=FALSE) |
| rowIndex = min(k, nrow(pipeline)) |
| |
| pipeline = pipeline[1:rowIndex,] |
| bestHyperparams = hyperparam[1:rowIndex,] |
| bestPipeline = pipeline[1:rowIndex] |
| } |
| |
| ########################################################################### |
| # The function will return the max performance by each individual pipeline |
| ############################################################################ |
| getMaxPerConf = function(Matrix[Double] pipelines, Double size) |
| return (Frame[Unknown] maxperconf) |
| { |
| tab = removeEmpty(target=table(pipelines[, 2], pipelines[, 3], pipelines[, 1]), margin="cols") |
| maxperconf = frame(0, rows=size, cols=1) |
| maxperconf[1:ncol(tab),] = as.frame(t(colMaxs(tab))) |
| } |
| |
| crossV = function(Matrix[double] X, Matrix[double] y, Integer cvk, Matrix[Double] evalFunHp, List[Unknown] pipList, List[Unknown] metaList, |
| Matrix[Double] hpForPruning = as.matrix(0), Matrix[Double] changesByOp = as.matrix(0), String evalFunc, Double ref = 0) |
| return (Double accuracy, Matrix[Double] evalFunHp, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Double allChanges) |
| { |
| |
| # # in the below condition we compute the hp using cv method on train dataset |
| if(is.na(as.scalar(evalFunHp[1,1]))) { |
| forEvalHp = eval(evalFunc, list(X=X, Y=y, Xtest=X, Ytest=y, Xorig=as.matrix(0), evalFunHp=evalFunHp)) |
| evalFunHp = forEvalHp[1, 2:ncol(forEvalHp)] |
| } |
| changesByPip = 0 |
| cvChanges = matrix(0, rows=cvk, cols=ncol(changesByOp)) |
| accuracyMatrix = matrix(0, cvk, 1) |
| allChanges = matrix(0, cvk, 1) |
| #create empty lists |
| dataset_X = list(); #empty list |
| dataset_y = list(); |
| fs = ceil(nrow(X)/cvk); |
| off = fs - 1; |
| #divide X, y into lists of k matrices |
| for (i in seq(1, cvk)) { |
| dataset_X = append(dataset_X, X[i*fs-off : min(i*fs, nrow(X)),]); |
| dataset_y = append(dataset_y, y[i*fs-off : min(i*fs, nrow(y)),]); |
| } |
| |
| beta_list = list(); |
| #keep one fold for testing in each iteration |
| for (i in seq(1, cvk)) { |
| [tmpX, testX] = remove(dataset_X, i); |
| [tmpy, testy] = remove(dataset_y, i); |
| trainX = rbind(tmpX); |
| trainy = rbind(tmpy); |
| testX = as.matrix(testX) |
| testy = as.matrix(testy) |
| if(as.scalar(pipList['flags']) != 0) # this flag is zero when CV is called from the dirtyScore function, means only accuracy calculation but no pipeline execution |
| { |
| [trainX, trainy, testX, testy, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=as.frame(pipList['ph']), |
| Xtrain=trainX, Ytrain=trainy, Xtest= testX, Ytest=testy, metaList=metaList, hyperParameters=as.matrix(pipList['hp']), hpForPruning=hpForPruning, |
| changesByOp=changesByOp, flagsCount=as.scalar(pipList['flags']), test=TRUE, verbose=FALSE, startInd=1, endInd=ncol(as.frame(pipList['ph']))) |
| #TODO double check why this is necessary |
| mincol = min(ncol(cvChanges),ncol(changesByOp)) |
| cvChanges[cvk,1:mincol] = changesByOp[,1:mincol]; |
| allChanges[i] = changesByPip |
| } |
| if(changesByPip < ref) |
| print("prunning alert 2: no training the model due to minimum changes") |
| else { |
| res = eval(evalFunc, list(X=trainX, Y=trainy, Xtest=testX, Ytest=testy, Xorig=as.matrix(0), evalFunHp=evalFunHp)) |
| accuracyMatrix[i] = res[1, 1] |
| } |
| |
| } |
| allChanges = min(allChanges) |
| changesByOp = colMaxs(cvChanges) |
| accuracy = mean(accuracyMatrix) |
| print("- cv accuracy: "+toString(accuracy)) |
| } |
| |
| pruningSignal = function(Matrix[Double] pipPre, Matrix[Double] pipNew, Matrix[Double] hp_matrix, Matrix[Double] hpForPruning, Matrix[Double] changesByOp) |
| return(Boolean execute) |
| { |
| execute = TRUE |
| prune = (hpForPruning > 0) & (changesByOp == 0) |
| changeCount = 0 |
| # # if there exist a case where the changes done by an operation are zeros |
| # check if pipelines are same |
| samePip = FALSE |
| samePip = sum(pipPre == pipNew) == ncol(pipPre) |
| if(sum(prune) > 0 & samePip ) |
| { |
| # get the non-zero index of hpForPruning |
| idx = (hpForPruning > 0) * t(seq(1, ncol(hpForPruning))) |
| idx = removeEmpty(target=idx, margin="cols") |
| for(i in 1:ncol(idx)) { |
| index = as.scalar(idx[1, i]) |
| inProcessHp = as.scalar(hp_matrix[index, 2]) |
| prvHp = as.scalar(hpForPruning[1, index]) |
| if(inProcessHp > prvHp) |
| changeCount = changeCount + 1 |
| } |
| } |
| execute = !(changeCount > 0) |
| } |
| |
| getParamMeta = function(Frame[Unknown] pipeline, Frame[Unknown] hpList) |
| return(Frame[Unknown] applyFunc, Matrix[Double] indexes, Matrix[Double] paramCount) |
| { |
| indexes = matrix(0, rows= ncol(pipeline), cols=1) |
| paramCount = matrix(0, rows= ncol(pipeline), cols=1) |
| applyList = hpList[, 1] |
| applyFunc = pipeline |
| parfor(k in 1:ncol(pipeline)) |
| { |
| op = as.scalar(pipeline[1,k]) |
| hasParam = map(hpList[,2], "x->x.split(\",\")[0].equals(\""+op+"\")") |
| # convert the boolean vector to 0/1 matrix representation |
| m_hasParam = hasParam == frame("true", rows=nrow(hasParam), cols=1) |
| m_hasParam = as.matrix(m_hasParam) |
| # compute the relevant index |
| index = m_hasParam * seq(1, nrow(m_hasParam)) |
| index = as.scalar(removeEmpty(target = index, margin = "rows")) |
| indexes[k] = index |
| paramCount[k] = as.integer(as.scalar(hpList[index, 3])) |
| applyFunc[1, k] = as.scalar(hpList[index, 1]) |
| } |
| } |
| |
| |
| |
| # # this method will call the execute pipelines with their hyper-parameters |
| run_with_hyperparamNested = function(Frame[Unknown] ph_pip, Integer r_i = 1, Matrix[Double] X, Matrix[Double] Y, |
| Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp, |
| Frame[Unknown] param, Boolean cv = FALSE, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean default = FALSE) |
| return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Integer pruneCount, Matrix[Double] changesByPipMatrix) |
| { |
| # # # TODO there is a partial overlap but it is negligible so we will not rewrite the scripts but lineage based reuse will get rid of it |
| changesByPipMatrix = matrix(0, rows=nrow(ph_pip) * r_i, cols=1) |
| pruneCount = 0 |
| output_hp = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)) * 5 * 3) |
| output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1) |
| output_pipelines = matrix(0, nrow(ph_pip)*r_i, 3) |
| # rows in validation set |
| clone_X = X |
| clone_Y = Y |
| clone_Xtest = Xtest |
| clone_Ytest = Ytest |
| index = 1 |
| ids = as.matrix(ph_pip[, 1:2]) |
| ph_pip = ph_pip[, 3:ncol(ph_pip)] |
| |
| parfor(i in 1:nrow(ph_pip), check = 0) # , opt=CONSTRAINED, mode=REMOTE_SPARK |
| { |
| evalFunOutput = as.matrix(0) |
| # execute configurations with r resources |
| op = removeEmpty(target=ph_pip[i], margin="cols") |
| # print("PIPELINE EXECUTION START ... "+toString(op)) |
| [hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, seed, enablePruning) |
| hpForPruning = matrix(0, rows=1, cols=ncol(op)) |
| changesByOp = matrix(0, rows=1, cols=ncol(op)) |
| metaList2 = metaList; #ensure metaList is no result var |
| metaList2["applyFunc"] = applyFunctions |
| for(r in 1:no_of_res) |
| { |
| # as the matrix first block of r rows belongs to first operator and r+1 block of rows to second operator |
| # we need to extract a row from each block |
| indexes = matrix(no_of_res, rows=ncol(op), cols=1) |
| indexes[1, 1] = r |
| indexes = cumsum(indexes) |
| indexes = table(indexes, 1, 1, nrow(hp), 1) |
| hp_matrix = removeEmpty(target = hp, margin="rows", select = indexes) |
| # # check if the pruning could be applied to avoid unnecessary executions |
| pruneSignal = pruningSignalNested(op, hp_matrix, hpForPruning, changesByOp) |
| executionSingnal = ifelse(enablePruning, pruneSignal, TRUE) |
| ref = ifelse(enablePruning, ref, 0) |
| if(executionSingnal) |
| { |
| t1 = time() |
| if(cv) |
| { |
| pipList = list(ph = op, hp = hp_matrix, flags = no_of_flag_vars) |
| [accuracy, evalHp, hpForPruning, changesByOp, changesByPip] = crossV(X=X, y=Y, cvk=cvk, evalFunHp=evalFunHp, |
| pipList=pipList, metaList=metaList2, hpForPruning=hpForPruning, |
| changesByOp=changesByOp, evalFunc=evaluationFunc, ref=ref) |
| } |
| else |
| { |
| [eXtrain, eYtrain, eXtest, eYtest, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=op, |
| Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList2, hyperParameters=hp_matrix, hpForPruning=hpForPruning, |
| changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE, startInd=1, endInd=ncol(op)) |
| if(max(eYtrain) == min(eYtrain)) |
| print("Y contains only one class") |
| else if(changesByPip < ref) |
| print("prunning alert 2: no training the model due to minimum changes") |
| else |
| evalFunOutput = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, Xtest=eXtest, Ytest=eYtest, Xorig=as.matrix(0), evalFunHp=evalFunHp)) |
| accuracy = as.scalar(evalFunOutput[1, 1]) |
| } |
| |
| # evalFunOutput = eval(evaluationFunc, argList) |
| matrix_width = as.matrix(nrow(hp_matrix) * ncol(hp_matrix)) |
| hp_vec = cbind(matrix_width, matrix(hp_matrix, rows=1, cols=nrow(hp_matrix)*ncol(hp_matrix), byrow=TRUE)) |
| index = (i - 1) * no_of_res + r |
| output_accuracy[index, 1] = accuracy |
| output_hp[index, 1:ncol(hp_vec)] = hp_vec |
| output_pipelines[index, ] = cbind(as.matrix(index), ids[i,1:2]) |
| } |
| else |
| { |
| pruneCount = pruneCount + 1 |
| print("prunningAlert: not executing instance : "+r+" pruneCount"+pruneCount) |
| } |
| changesByPipMatrix[index] = changesByPip |
| index = index + 1 |
| } |
| } |
| sel = rowSums(output_hp) > 0 |
| output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows", select = sel) |
| output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines), margin="rows", select = sel) |
| changesByPipMatrix = removeEmpty(target=changesByPipMatrix, margin="rows", select = sel) |
| } |
| |
| pruningSignalNested = function(Frame[String] op, Matrix[Double] hp_matrix, Matrix[Double] hpForPruning, Matrix[Double] changesByOp) |
| return(Boolean execute) |
| { |
| execute = TRUE |
| prune = (hpForPruning > 0) & (changesByOp == 0) |
| changeCount = 0 |
| # # if there exist a case where the changes done by an operation are zeros |
| # check if pipelines are same |
| samePip = FALSE |
| |
| if(sum(prune) > 0) |
| { |
| # get the non-zero index of hpForPruning |
| idx = (hpForPruning > 0) * t(seq(1, ncol(hpForPruning))) |
| idx = removeEmpty(target=idx, margin="cols") |
| for(i in 1:ncol(idx)) { |
| index = as.scalar(idx[1, i]) |
| inProcessHp = as.scalar(hp_matrix[index, 2]) |
| prvHp = as.scalar(hpForPruning[1, index]) |
| if(inProcessHp > prvHp) |
| changeCount = changeCount + 1 |
| } |
| } |
| execute = !(changeCount > 0) |
| } |