blob: d39af18f80a9a6722ac9f3a435cad6a828d040c2 [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
# In The bandit function the objective is to find an arm that optimizes
# a known functional of the unknown arm-reward distributions.
#
# INPUT:
# -----------------------------------------------------------------------------------
# X_train ---
# Y_train ---
# X_test ---
# Y_test ---
# metaList ---
# evaluationFunc ---
# evalFunHp ---
# lp ---
# primitives ---
# params ---
# K ---
# R ---
# baseLineScore ---
# cv ---
# cvk ---
# verbose ---
# output ---
# -----------------------------------------------------------------------------------
#
# OUTPUT:
# --------------------------------------
# perf ---
# --------------------------------------
m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, Matrix[Double] X_test, Matrix[Double] Y_test, List[Unknown] metaList,
String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] lp, Matrix[Double] lpHp, Frame[Unknown] primitives, Frame[Unknown] param, Integer k = 3,
Integer R=50, Double baseLineScore, Boolean cv, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean verbose = TRUE)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy, Frame[String] applyFunc)
{
print("Starting optimizer")
totalPruneCount = 0
FLAG_VARIABLE = 5
pipelines_executed = 0
HYPERPARAM_LENGTH = ((ncol(lp) + 2) * FLAG_VARIABLE * 3) + 1 ## num of col in logical * 5 meat flag vars * max hyperparam per op + 1 accuracy col
bestPipeline = frame("", rows=1, cols=1)
bestHyperparams = as.matrix(0)
bestAccuracy = as.matrix(0)
# initialize bandit variables
# variable names follow publication where algorithm is introduced
eta = 2 # the halving ratio is fixed to 2
s_max = floor(log(R,eta));
# # compute weights for R and then increase/decrease R with respect to importance of configurations
weight = matrix(1/s_max , rows=s_max, cols=1)
weight = cumsum(weight)
# weight = matrix(1, rows=s_max, cols=1)
# print("weight matrix: "+toString(weight))
# initialize output variables
hparam = matrix(0, rows=k*(s_max), cols=HYPERPARAM_LENGTH)
pipeline = matrix(0, rows=k*(s_max), cols=4)
endIdx = matrix(k, rows=(s_max), cols=1)
endIdx = cumsum(endIdx)
startIdx = (endIdx - k) + 1
logicalBest = lp
n = ifelse(s_max >= nrow(lp), nrow(lp), n = ceil(nrow(lp)/s_max);)
pipelineId = as.frame(seq(1, nrow(lp)))
lp = cbind(pipelineId, lp)
mainLookup = lp
B = (s_max + 1) * R;
s_max = s_max - 1
idx = 1
for(s in s_max:0) {
# result variables
bracket_hp = matrix(0, rows=k*(s+1)+k, cols=HYPERPARAM_LENGTH)
bracket_pipel = matrix(0, rows=k*(s+1)+k, cols=4)
start=1; end=0;
# # compute the number of initial pipelines n
r = max(R * as.scalar(weight[((s_max - s) + 1)]) * eta^(-s), 1);
configurations = lp[1:(min(n, nrow(lp)))]
# append configuration keys for extracting the pipeline later on
id = seq(1, nrow(configurations))
configurations = cbind(as.frame(id), configurations)
for(i in 0:s) {
# successive halving
n_i = min(max(as.integer(floor(n * eta^(-i))), 1), nrow(configurations));
r_i = as.integer(floor(r * eta^i));
if(verbose) {
print("no of configurations ---------"+n_i)
print("no of resources --------------"+r_i)
print("iteration ---------------------"+i+" out of "+s)
}
configurations = configurations[1:n_i, ]
pipelines_executed = pipelines_executed + (n_i * r_i)
[outPip,outHp, pruneCount] = run_with_hyperparam(ph_pip=configurations, r_i=r_i, X=X_train, Y=Y_train, Xtest=X_test, Ytest=Y_test, metaList=metaList,
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, param=param, cv=cv, cvk=cvk, ref=ref, seed = seed, enablePruning=enablePruning)
totalPruneCount = totalPruneCount + pruneCount
# sort the pipelines by order of accuracy decreasing
IX = order(target = outPip, by = 1, decreasing=TRUE, index.return=TRUE)
P = table(seq(1,nrow(IX)), IX, nrow(IX), nrow(outPip));
a = P %*% outPip
b = P %*% outHp
rowIndex = min(k, nrow(a))
# maintain the brackets results
end = end + rowIndex
bracket_pipel[start:end, 1:ncol(a)] = a[1:rowIndex,]
bracket_hp[start:end, 1:ncol(b)] = b[1:rowIndex,]
start = end + 1
# sort the configurations for successive halving
avergae_perf = getMaxPerConf(outPip, nrow(configurations))
sortMask = matrix(1, rows=1, cols=ncol(configurations) + 1)
sortMask[1,1] = 0
configurations = frameSort(cbind(avergae_perf, configurations), sortMask, TRUE)
configurations = configurations[, 2:ncol(configurations)]
}
if(n < nrow(lp))
lp = lp[n+1:nrow(lp),]
bracket_pipel = removeEmpty(target=bracket_pipel, margin="rows")
bracket_hp = removeEmpty(target=bracket_hp, margin="rows")
# keep the best k results for each bracket
[bracket_bestPipeline, bracket_bestHyperparams] = extractBracketWinners(bracket_pipel, bracket_hp, k)
# optimize by the features
startOut = as.scalar(startIdx[idx])
endOut = min(as.scalar(endIdx[idx]), (startOut + nrow(bracket_bestPipeline) - 1))
pipeline[startOut:endOut, ] = bracket_bestPipeline
hparam[startOut:endOut, 1:ncol(bracket_bestHyperparams)] = bracket_bestHyperparams
idx = idx + 1
}
[bestPipeline, bestHyperparams, bestAccuracy] = extractTopK(pipeline, hparam, baseLineScore, k, mainLookup, lp, lpHp)
imp = as.double(as.scalar(bestAccuracy[1, 1])) - as.double(baseLineScore)
perf = imp > 0
applyFunc = bestPipeline
for(k in 1:nrow(bestPipeline))
{
bestPip = removeEmpty(target=bestPipeline[k], margin="cols")
applyOp = getParamMeta(bestPip, param)
applyFunc[k, 1:ncol(applyOp)] = applyOp
}
if(verbose) {
print("dirty accuracy "+toString(baseLineScore))
print("topk pipelines \n"+toString(bestPipeline))
print("topk hyper params \n"+toString(bestHyperparams))
print("topk scores: \n"+toString(bestAccuracy))
print("evalHp: \n"+toString(evalFunHp))
print("performance improvement "+ imp)
print("total physical pipelines to be executed: "+pipelines_executed)
print("prune count: "+totalPruneCount)
print("actual executed pipelines: "+(pipelines_executed - totalPruneCount))
}
}
# this method will extract the physical pipelines for a given logical pipelines
get_physical_configurations = function(Frame[String] logical, Scalar[int] numConfigs = 10,
Frame[Unknown] primitives)
return(Frame[String] physical)
{
# load the primitives
physical = as.frame("NaN")
ed = primitives[, 1]
mvi = primitives[, 2]
outliers = primitives[,3]
ec = primitives[, 4]
scale = primitives[, 5]
ci = primitives[, 6]
dummy = primitives[,7]
dim = primitives[, 8]
operator = frame(0, rows=nrow(primitives), cols=ncol(logical)) # combine all logical primitives
parfor(j in 1:ncol(logical), check = 0)
{
# extract the physical primitives
if(as.scalar(logical[1,j]) == "ED")
operator[, j] = ed;
else if(as.scalar(logical[1,j]) == "EC")
operator[, j] = ec;
else if(as.scalar(logical[1,j]) == "OTLR")
operator[, j] = outliers;
else if(as.scalar(logical[1,j]) == "MVI")
operator[, j] = mvi;
else if(as.scalar(logical[1,j]) == "CI")
operator[, j] = ci;
else if(as.scalar(logical[1,j]) == "DIM")
operator[, j] = dim;
else if(as.scalar(logical[1,j]) == "DUMMY")
operator[, j] = dummy;
else if(as.scalar(logical[1,j]) == "SCALE")
operator[, j] = scale;
else print("invalid operation "+as.scalar(logical[1,j]))
}
physical = operator
}
# # this method will call the execute pipelines with their hyper-parameters
run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i = 1, Matrix[Double] X, Matrix[Double] Y,
Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp,
Frame[Unknown] param, Boolean cv = FALSE, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean default = FALSE)
return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Integer pruneCount, Matrix[Double] changesByPipMatrix)
{
# # # TODO there is a partial overlap but it is negligible so we will not rewrite the scripts but lineage based reuse will get rid of it
changesByPipMatrix = matrix(0, rows=nrow(ph_pip) * r_i, cols=1)
pruneCount = 0
output_hp = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)) * 5 * 3)
output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
output_pipelines = matrix(0, nrow(ph_pip)*r_i, 3)
# rows in validation set
ids = as.matrix(ph_pip[, 1:2])
ph_pip = ph_pip[, 3:ncol(ph_pip)]
inputHpMatrix = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)) * 5 * 3 + 1)
# prepare the pipelines and resources
allPipelines = frame(0, rows = nrow(ph_pip) * r_i, cols=ncol(ph_pip))
allApplyFunctions = frame(0, rows = nrow(ph_pip) * r_i, cols=ncol(ph_pip))
allIds = matrix(0, rows = nrow(ph_pip) * r_i, cols=2)
start = 1
end = 0
i = 1
while(i <= nrow(ph_pip))
{
op = removeEmpty(target=ph_pip[i], margin="cols")
[hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, seed, enablePruning)
tmpFrameApply = frame(0, rows=no_of_res, cols=ncol(applyFunctions))
tmpFrameApply = freplicate(tmpFrameApply, applyFunctions)
tmpFramePip = frame(0, rows=no_of_res, cols=ncol(op))
tmpFramePip = freplicate(tmpFramePip, op)
tmpIds = matrix(1, rows=no_of_res, cols=2) * ids[i]
end = start + (no_of_res - 1)
allPipelines[start:end, 1:ncol(op)] = tmpFramePip
allApplyFunctions[start:end, 1:ncol(op)] = tmpFrameApply
allIds[start:end,] = tmpIds
if(no_of_res == 1) {
hp = matrix(hp, rows=no_of_res, cols = ncol(hp) * ncol(op))
inputHpMatrix[i, 1:ncol(hp)+1] = cbind(as.matrix(ncol(hp)), hp)
i = i + 1
}
else
{
for(r in 1:no_of_res)
{
indexes = matrix(no_of_res, rows=ncol(op), cols=1)
indexes[1, 1] = r
indexes = cumsum(indexes)
indexes = table(indexes, 1, 1, nrow(hp), 1)
hp_matrix = removeEmpty(target = hp, margin="rows", select = indexes)
tmp = matrix(hp_matrix, rows=1, cols=nrow(hp_matrix) * ncol(hp_matrix))
tmp = cbind(as.matrix(ncol(tmp)), tmp)
inputHpMatrix[i, 1:ncol(tmp)] = tmp
i = i + 1
}
}
start = end + 1
}
allPipelines = removeEmpty(target = allPipelines, margin="rows")
inputHpMatrix = removeEmpty(target = inputHpMatrix, margin="rows")
opPre = allIds[1]
accuracy = 0
changesByPip = 0
parfor(i in 1:nrow(allPipelines), check = 0) # opt=CONSTRAINED, mode=REMOTE_SPARK
{
evalFunOutput = as.matrix(0)
# execute configurations with r resources
op = removeEmpty(target=allPipelines[i], margin="cols")
opNew = allIds[i]
hp = inputHpMatrix[i]
totalVals = as.scalar(hp[1, 1]) + 1
hp = hp[, 2:totalVals]
applyFunctions = allApplyFunctions[i]
no_of_res = nrow(hp)
# print("PIPELINE EXECUTION START ... "+toString(op))
hpForPruning = matrix(0, rows=1, cols=ncol(op))
changesByOp = matrix(0, rows=1, cols=ncol(op))
metaList2 = metaList; #ensure metaList is no result var
metaList2["applyFunc"] = applyFunctions
hp_matrix = matrix(hp, rows=ncol(op), cols=ncol(hp)/ncol(op)) #removeEmpty(target = hp, margin="rows", select = indexes)
# # check if the pruning could be applied to avoid unnecessary executions
pruneSignal = pruningSignal(opPre, opNew, hp_matrix, hpForPruning, changesByOp)
opPre = opNew
executionSingnal = TRUE #ifelse(enablePruning, pruneSignal, TRUE)
ref = ifelse(enablePruning, ref, 0)
if(executionSingnal)
{
t1 = time()
if(cv)
{
pipList = list(ph = op, hp = hp_matrix, flags = no_of_flag_vars)
[accuracy, evalHp, hpForPruning, changesByOp, changesByPip] = crossV(X=X, y=Y, cvk=cvk, evalFunHp=evalFunHp,
pipList=pipList, metaList=metaList2, hpForPruning=hpForPruning,
changesByOp=changesByOp, evalFunc=evaluationFunc, ref=ref)
}
else
{
[eXtrain, eYtrain, eXtest, eYtest, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=op,
Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList2, hyperParameters=hp_matrix, hpForPruning=hpForPruning,
changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE, startInd=1, endInd=ncol(op))
if(max(eYtrain) == min(eYtrain))
print("Y contains only one class")
else if(changesByPip < ref)
print("prunningAlert 2: not training the model due to minimum changes")
else
evalFunOutput = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, Xtest=eXtest, Ytest=eYtest, Xorig=as.matrix(0), evalFunHp=evalFunHp))
accuracy = as.scalar(evalFunOutput[1, 1])
}
# evalFunOutput = eval(evaluationFunc, argList)
matrix_width = as.matrix(nrow(hp_matrix) * ncol(hp_matrix))
hp_vec = inputHpMatrix[i]
output_accuracy[i, 1] = accuracy
output_hp[i, 1:ncol(hp_vec)] = hp_vec
output_pipelines[i, ] = cbind(as.matrix(i), allIds[i,1:2])
}
else
{
pruneCount = pruneCount + 1
print("prunningAlert 1: not executing instance : "+i+" pruneCount"+pruneCount)
}
changesByPipMatrix[i] = changesByPip
}
sel = rowSums(output_hp) > 0
output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows", select = sel)
output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines), margin="rows", select = sel)
changesByPipMatrix = removeEmpty(target=changesByPipMatrix, margin="rows", select = sel)
}
# extract the hyper-parameters for pipelines
getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Integer no_of_res, Boolean default, Integer seed = -1, Boolean enablePruning)
return (Matrix[Double] paramMatrix, Frame[Unknown] applyFunc, Integer no_of_res, Integer NUM_META_FLAGS)
{
allParam = 0;
NUM_META_FLAGS = 5
NUM_DEFAULT_VALUES = 4
# load the hyper-parameters values
paramList = list()
# store the row indexes of the operator matches
[applyFunc, indexes, paramCount] = getParamMeta(pipeline, hpList)
hpList = hpList[, 3:ncol(hpList)]
DEFAULT_INDEX = 7
START_INDEX = 11 # value from where the hyper-params starts after skipping meta flags
# if there are no hyper-parameters than change the values of resources
# so that the pipeline is only executed once and no resource are wasted, saving looping
no_of_res = ifelse(sum(paramCount) > 0, no_of_res, 1)
# the below matrix stores the different combinations of hyper-parameter value for each pipeline
# if the resource value is greater than zero this means for 1 pipeline it will store r rows where each row store set
# of hyperparameter values for ith pipeline. If resource value rv = 10 and ncol(pip) = 3 then the output matrix will have
# 10*3= 30 rows and 1:10 hyper-paramters for i-the pipeline 11:20 for (i+1)-th pipeline and so on
# this matrix stores no. of hps, values of hps, and flags
paramMatrix = matrix(0, rows=ncol(pipeline)*no_of_res, cols=max(paramCount)+NUM_META_FLAGS+1)
parfor(i in 1:ncol(pipeline), check=0) {
op = as.scalar(pipeline[1, i])
index = as.scalar(indexes[i])
no_of_param = as.integer(as.scalar(paramCount[i]))
# extract hasY and verbose flags
attachMask = matrix(as.scalar(hpList[index, 2]), rows=no_of_res, cols=1)
attachFD = matrix(as.scalar(hpList[index, 3]), rows=no_of_res, cols=1)
attachY = matrix(as.scalar(hpList[index, 4]), rows=no_of_res, cols=1)
isVerbose = matrix(as.scalar(hpList[index, 5]), rows=no_of_res, cols=1)
dataFlag = matrix(as.scalar(hpList[index, 6]), rows=no_of_res, cols=1)
if(no_of_param > 0) {
paramIdx = START_INDEX
typeIdx = START_INDEX
OpParam = matrix(0, rows=no_of_res, cols=max(paramCount))
if(default | no_of_res == 1) {
OpParam[1, 1:no_of_param] = as.matrix(hpList[index, DEFAULT_INDEX:DEFAULT_INDEX+(no_of_param - 1)])
}
else {
for(j in 1:no_of_param) {
type = as.scalar(hpList[index, typeIdx])
paramValIndex = (no_of_param) + paramIdx
minVal = as.scalar(hpList[index, paramValIndex])
maxVal = as.scalar(hpList[index, paramValIndex + 1])
if(type == "FP") {
val = rand(rows=no_of_res, cols=1, min=minVal, max=maxVal, pdf="uniform", seed=seed);
OpParam[, j] = val;
}
else if(type == "INT") {
if(as.integer(maxVal) > no_of_res)
val = sample(as.integer(maxVal), no_of_res, FALSE, seed)
else
val = sample(as.integer(maxVal), no_of_res, TRUE, seed)
less_than_min = val < as.integer(minVal);
val = (less_than_min * minVal) + val;
OpParam[, j] = val;
}
else if(type == "BOOL") {
if(maxVal == 1) {
s = sample(2, no_of_res, TRUE, seed);
b = s - 1;
OpParam[, j] = b;
}
else
OpParam[, j] = matrix(0, rows=no_of_res, cols=1)
}
else
print("invalid data type") # TODO handle string set something like {,,}
paramIdx = paramIdx + 2
typeIdx = typeIdx + 1
}
}
if((op == "outlierBySd" | op == "outlierByIQR" | op == "imputeByFd") & no_of_res > 1 & enablePruning)
OpParam = order(target=OpParam, by = 1, decreasing = FALSE, index.return = FALSE)
# hyper-parameter vector contains no. of hp, values of hp, and flag values
OpParam = cbind(matrix(no_of_param, rows=nrow(OpParam), cols=1),OpParam, attachMask,
attachFD, attachY, isVerbose, dataFlag)
}
else {
# no hyper-parameters, so create a dummy matrix of zeros so flags are always aligned
dummy = matrix(0, rows=no_of_res, cols=max(paramCount)+1)
OpParam = cbind(dummy, attachMask, attachFD, attachY)
OpParam = cbind(OpParam, isVerbose, dataFlag)
}
paramMatrix[((i-1)*no_of_res)+1:i*no_of_res, 1:ncol(OpParam)] = OpParam
}
}
# extract the top k pipelines as a final result after deduplication and sorting
extractTopK = function(Matrix[Double] pipeline, Matrix[Double] hyperparam,
Double baseLineScore, Integer k, Frame[Unknown] mainLookup, Frame[Unknown] bestLogical, Matrix[Double] lpHp)
return (Frame[Unknown] bestPipeline, Matrix[Double] bestHyperparams, Matrix[Double] bestAccuracy)
{
bestLogical = as.matrix(bestLogical[, 1])
pipeline = pipeline[, ncol(pipeline)]
blN = nrow(bestLogical)
lpHp = lpHp[nrow(lpHp)-blN+1:nrow(lpHp),]
IX = order(target = rbind(lpHp, hyperparam), by = 1, decreasing=TRUE, index.return=TRUE)
P = table(seq(1,nrow(IX)), IX, nrow(IX), nrow(rbind(lpHp, hyperparam)));
hyperparam = P %*% rbind(lpHp, hyperparam)
pipeline = P %*% rbind(bestLogical, pipeline)
# remove the row with accuracy less than test accuracy
mask = (hyperparam[, 1] < baseLineScore) == 0
if(sum(mask) == 0)
mask[1, 1] = 1
hyperparam = removeEmpty(target = hyperparam, margin = "rows", select = mask)
pipeline = removeEmpty(target = pipeline, margin = "rows", select = mask)
rowIndex = min(nrow(hyperparam), k)
# select the top k
bestAccuracy = hyperparam[1:rowIndex, 1]
bestHyperparams = hyperparam[1:rowIndex, 2:ncol(hyperparam)]
pipeline = pipeline[1:rowIndex]
# # # lookup for the pipelines
pipCode = pipeline[, 1]
bestPipeline = frame(data="0", rows=nrow(pipeline), cols=ncol(mainLookup))
parfor(i in 1: nrow(pipeline)) {
index = as.scalar(pipCode[i])
bestPipeline[i] = mainLookup[index]
}
bestPipeline = bestPipeline[, 2:ncol(bestPipeline)]
}
# extract the top k pipelines for each bracket, the intermediate results
extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperparam, Integer k)
return (Matrix[Double] bestPipeline, Matrix[Double] bestHyperparams)
{
# bestPipeline = frameSort(bestPipeline)
hyperparam = order(target = hyperparam, by = 1, decreasing=TRUE, index.return=FALSE)
pipeline = order(target = pipeline, by = 1, decreasing=TRUE, index.return=FALSE)
rowIndex = min(k, nrow(pipeline))
pipeline = pipeline[1:rowIndex,]
bestHyperparams = hyperparam[1:rowIndex,]
bestPipeline = pipeline[1:rowIndex]
}
###########################################################################
# The function will return the max performance by each individual pipeline
############################################################################
getMaxPerConf = function(Matrix[Double] pipelines, Double size)
return (Frame[Unknown] maxperconf)
{
tab = removeEmpty(target=table(pipelines[, 2], pipelines[, 3], pipelines[, 1]), margin="cols")
maxperconf = frame(0, rows=size, cols=1)
maxperconf[1:ncol(tab),] = as.frame(t(colMaxs(tab)))
}
crossV = function(Matrix[double] X, Matrix[double] y, Integer cvk, Matrix[Double] evalFunHp, List[Unknown] pipList, List[Unknown] metaList,
Matrix[Double] hpForPruning = as.matrix(0), Matrix[Double] changesByOp = as.matrix(0), String evalFunc, Double ref = 0)
return (Double accuracy, Matrix[Double] evalFunHp, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Double allChanges)
{
# # in the below condition we compute the hp using cv method on train dataset
if(is.na(as.scalar(evalFunHp[1,1]))) {
forEvalHp = eval(evalFunc, list(X=X, Y=y, Xtest=X, Ytest=y, Xorig=as.matrix(0), evalFunHp=evalFunHp))
evalFunHp = forEvalHp[1, 2:ncol(forEvalHp)]
}
changesByPip = 0
cvChanges = matrix(0, rows=cvk, cols=ncol(changesByOp))
accuracyMatrix = matrix(0, cvk, 1)
allChanges = matrix(0, cvk, 1)
#create empty lists
dataset_X = list(); #empty list
dataset_y = list();
fs = ceil(nrow(X)/cvk);
off = fs - 1;
#divide X, y into lists of k matrices
for (i in seq(1, cvk)) {
dataset_X = append(dataset_X, X[i*fs-off : min(i*fs, nrow(X)),]);
dataset_y = append(dataset_y, y[i*fs-off : min(i*fs, nrow(y)),]);
}
beta_list = list();
#keep one fold for testing in each iteration
for (i in seq(1, cvk)) {
[tmpX, testX] = remove(dataset_X, i);
[tmpy, testy] = remove(dataset_y, i);
trainX = rbind(tmpX);
trainy = rbind(tmpy);
testX = as.matrix(testX)
testy = as.matrix(testy)
if(as.scalar(pipList['flags']) != 0) # this flag is zero when CV is called from the dirtyScore function, means only accuracy calculation but no pipeline execution
{
[trainX, trainy, testX, testy, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=as.frame(pipList['ph']),
Xtrain=trainX, Ytrain=trainy, Xtest= testX, Ytest=testy, metaList=metaList, hyperParameters=as.matrix(pipList['hp']), hpForPruning=hpForPruning,
changesByOp=changesByOp, flagsCount=as.scalar(pipList['flags']), test=TRUE, verbose=FALSE, startInd=1, endInd=ncol(as.frame(pipList['ph'])))
#TODO double check why this is necessary
mincol = min(ncol(cvChanges),ncol(changesByOp))
cvChanges[cvk,1:mincol] = changesByOp[,1:mincol];
allChanges[i] = changesByPip
}
if(changesByPip < ref)
print("prunning alert 2: no training the model due to minimum changes")
else {
res = eval(evalFunc, list(X=trainX, Y=trainy, Xtest=testX, Ytest=testy, Xorig=as.matrix(0), evalFunHp=evalFunHp))
accuracyMatrix[i] = res[1, 1]
}
}
allChanges = min(allChanges)
changesByOp = colMaxs(cvChanges)
accuracy = mean(accuracyMatrix)
print("- cv accuracy: "+toString(accuracy))
}
pruningSignal = function(Matrix[Double] pipPre, Matrix[Double] pipNew, Matrix[Double] hp_matrix, Matrix[Double] hpForPruning, Matrix[Double] changesByOp)
return(Boolean execute)
{
execute = TRUE
prune = (hpForPruning > 0) & (changesByOp == 0)
changeCount = 0
# # if there exist a case where the changes done by an operation are zeros
# check if pipelines are same
samePip = FALSE
samePip = sum(pipPre == pipNew) == ncol(pipPre)
if(sum(prune) > 0 & samePip )
{
# get the non-zero index of hpForPruning
idx = (hpForPruning > 0) * t(seq(1, ncol(hpForPruning)))
idx = removeEmpty(target=idx, margin="cols")
for(i in 1:ncol(idx)) {
index = as.scalar(idx[1, i])
inProcessHp = as.scalar(hp_matrix[index, 2])
prvHp = as.scalar(hpForPruning[1, index])
if(inProcessHp > prvHp)
changeCount = changeCount + 1
}
}
execute = !(changeCount > 0)
}
getParamMeta = function(Frame[Unknown] pipeline, Frame[Unknown] hpList)
return(Frame[Unknown] applyFunc, Matrix[Double] indexes, Matrix[Double] paramCount)
{
indexes = matrix(0, rows= ncol(pipeline), cols=1)
paramCount = matrix(0, rows= ncol(pipeline), cols=1)
applyList = hpList[, 1]
applyFunc = pipeline
parfor(k in 1:ncol(pipeline))
{
op = as.scalar(pipeline[1,k])
hasParam = map(hpList[,2], "x->x.split(\",\")[0].equals(\""+op+"\")")
# convert the boolean vector to 0/1 matrix representation
m_hasParam = hasParam == frame("true", rows=nrow(hasParam), cols=1)
m_hasParam = as.matrix(m_hasParam)
# compute the relevant index
index = m_hasParam * seq(1, nrow(m_hasParam))
index = as.scalar(removeEmpty(target = index, margin = "rows"))
indexes[k] = index
paramCount[k] = as.integer(as.scalar(hpList[index, 3]))
applyFunc[1, k] = as.scalar(hpList[index, 1])
}
}
# # this method will call the execute pipelines with their hyper-parameters
run_with_hyperparamNested = function(Frame[Unknown] ph_pip, Integer r_i = 1, Matrix[Double] X, Matrix[Double] Y,
Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp,
Frame[Unknown] param, Boolean cv = FALSE, Integer cvk = 2, Double ref = 0, Integer seed = -1, Boolean enablePruning = FALSE, Boolean default = FALSE)
return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Integer pruneCount, Matrix[Double] changesByPipMatrix)
{
# # # TODO there is a partial overlap but it is negligible so we will not rewrite the scripts but lineage based reuse will get rid of it
changesByPipMatrix = matrix(0, rows=nrow(ph_pip) * r_i, cols=1)
pruneCount = 0
output_hp = matrix(0, nrow(ph_pip)*r_i, (ncol(ph_pip)) * 5 * 3)
output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
output_pipelines = matrix(0, nrow(ph_pip)*r_i, 3)
# rows in validation set
clone_X = X
clone_Y = Y
clone_Xtest = Xtest
clone_Ytest = Ytest
index = 1
ids = as.matrix(ph_pip[, 1:2])
ph_pip = ph_pip[, 3:ncol(ph_pip)]
parfor(i in 1:nrow(ph_pip), check = 0) # , opt=CONSTRAINED, mode=REMOTE_SPARK
{
evalFunOutput = as.matrix(0)
# execute configurations with r resources
op = removeEmpty(target=ph_pip[i], margin="cols")
# print("PIPELINE EXECUTION START ... "+toString(op))
[hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, seed, enablePruning)
hpForPruning = matrix(0, rows=1, cols=ncol(op))
changesByOp = matrix(0, rows=1, cols=ncol(op))
metaList2 = metaList; #ensure metaList is no result var
metaList2["applyFunc"] = applyFunctions
for(r in 1:no_of_res)
{
# as the matrix first block of r rows belongs to first operator and r+1 block of rows to second operator
# we need to extract a row from each block
indexes = matrix(no_of_res, rows=ncol(op), cols=1)
indexes[1, 1] = r
indexes = cumsum(indexes)
indexes = table(indexes, 1, 1, nrow(hp), 1)
hp_matrix = removeEmpty(target = hp, margin="rows", select = indexes)
# # check if the pruning could be applied to avoid unnecessary executions
pruneSignal = pruningSignalNested(op, hp_matrix, hpForPruning, changesByOp)
executionSingnal = ifelse(enablePruning, pruneSignal, TRUE)
ref = ifelse(enablePruning, ref, 0)
if(executionSingnal)
{
t1 = time()
if(cv)
{
pipList = list(ph = op, hp = hp_matrix, flags = no_of_flag_vars)
[accuracy, evalHp, hpForPruning, changesByOp, changesByPip] = crossV(X=X, y=Y, cvk=cvk, evalFunHp=evalFunHp,
pipList=pipList, metaList=metaList2, hpForPruning=hpForPruning,
changesByOp=changesByOp, evalFunc=evaluationFunc, ref=ref)
}
else
{
[eXtrain, eYtrain, eXtest, eYtest, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=op,
Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList2, hyperParameters=hp_matrix, hpForPruning=hpForPruning,
changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE, startInd=1, endInd=ncol(op))
if(max(eYtrain) == min(eYtrain))
print("Y contains only one class")
else if(changesByPip < ref)
print("prunning alert 2: no training the model due to minimum changes")
else
evalFunOutput = eval(evaluationFunc, list(X=eXtrain, Y=eYtrain, Xtest=eXtest, Ytest=eYtest, Xorig=as.matrix(0), evalFunHp=evalFunHp))
accuracy = as.scalar(evalFunOutput[1, 1])
}
# evalFunOutput = eval(evaluationFunc, argList)
matrix_width = as.matrix(nrow(hp_matrix) * ncol(hp_matrix))
hp_vec = cbind(matrix_width, matrix(hp_matrix, rows=1, cols=nrow(hp_matrix)*ncol(hp_matrix), byrow=TRUE))
index = (i - 1) * no_of_res + r
output_accuracy[index, 1] = accuracy
output_hp[index, 1:ncol(hp_vec)] = hp_vec
output_pipelines[index, ] = cbind(as.matrix(index), ids[i,1:2])
}
else
{
pruneCount = pruneCount + 1
print("prunningAlert: not executing instance : "+r+" pruneCount"+pruneCount)
}
changesByPipMatrix[index] = changesByPip
index = index + 1
}
}
sel = rowSums(output_hp) > 0
output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows", select = sel)
output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines), margin="rows", select = sel)
changesByPipMatrix = removeEmpty(target=changesByPipMatrix, margin="rows", select = sel)
}
pruningSignalNested = function(Frame[String] op, Matrix[Double] hp_matrix, Matrix[Double] hpForPruning, Matrix[Double] changesByOp)
return(Boolean execute)
{
execute = TRUE
prune = (hpForPruning > 0) & (changesByOp == 0)
changeCount = 0
# # if there exist a case where the changes done by an operation are zeros
# check if pipelines are same
samePip = FALSE
if(sum(prune) > 0)
{
# get the non-zero index of hpForPruning
idx = (hpForPruning > 0) * t(seq(1, ncol(hpForPruning)))
idx = removeEmpty(target=idx, margin="cols")
for(i in 1:ncol(idx)) {
index = as.scalar(idx[1, i])
inProcessHp = as.scalar(hp_matrix[index, 2])
prvHp = as.scalar(hpForPruning[1, index])
if(inProcessHp > prvHp)
changeCount = changeCount + 1
}
}
execute = !(changeCount > 0)
}