blob: 3e705595fead8186752324e503dd6931f01cf0b0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# import
source("./scripts/staging/pipelines/permutations.dml") as perm;
source("./scripts/staging/pipelines/utils.dml") as utils;
enumerator = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] logical, Frame[String] outlierPrimitives,
Frame[String] mviPrimitives, Frame[String] param, Integer k, Boolean verbose = TRUE)
return(Frame[String] Kpipeline)
for(i in 1:2) {#nrow(logical)
operator = as.frame(matrix(0,nrow(outlierPrimitives),1)) #combine all logical primitives
for(j in 1:ncol(logical))
if(as.scalar(logical[i,j]) == "outlier")
operator = cbind(operator, outlierPrimitives);
else if(as.scalar(logical[i,j]) == "MVI")
operator = cbind(operator, mviPrimitives);
operator = operator[,2:ncol(operator)]
intermediates = perm::getPermutations(operator) # get the all possible combination of physical primitives
# for ith logical pipeline
print(" pipelines \n"+toString(intermediates))
[p, h] = executeAll(X, Y,intermediates, param, verbose);
Kpipeline = getFinalTopK(p, h, k)
print("top k pipelines of "+i+"th logical pipeline "+toString(Kpipeline))
# str = "top k pipelines of iteration "+i
# str = append(str, toString(Kpipeline))
# if(verbose)
# print("final top k pipelines \n"+toString(Kpipeline))
# write(str, "D:/Workspace/Pipelines/output/kpipeline.txt")
# The pipeline execution functions
executeAll = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] intermediates, Frame[String] param, Boolean verbose)
return(Frame[String] opt, Matrix[Double] hyper_param)
clone_X = X;
# initialize output variables
opt = as.frame("NA")
hyper_param = matrix(0,0,1)
print("total pipelines to be executed "+nrow(intermediates))
for(i in 1:nrow(intermediates)) {
paraList = list()
op = intermediates[i,]
paraList = getInstanceParam(op, param)
sum = 1
print("executing "+toString(op))
while(sum > 0) #condition to terminate when all hyper parameters are executed
paramL = list()
hp_temp = matrix(0,1,0)
opt_temp = op
for(j in 1: length(paraList))
singleHp = as.matrix(paraList[j])
hp_temp = cbind(hp_temp, as.matrix(ncol(singleHp)))
hp_temp = cbind(hp_temp, singleHp[1,])
paramL = append(paramL, singleHp[1, ])
if(nrow(singleHp) > 1)
singleHp = singleHp[2:nrow(singleHp),]
paraList[j] = singleHp
sum = sum(singleHp)
X = executePipeline(op, X, paramL, FALSE)
data = cbind(Y, X)
acc = eval("fclassify", data)
hp_temp = cbind(hp_temp, acc)
X = clone_X
if(as.scalar(opt[1,1]) == "NA" & nrow(hyper_param) == 0)
opt = opt_temp
hyper_param = hp_temp
else {
opt = rbind(opt, opt_temp)
hyper_param = rbind(hyper_param, hp_temp)
X = clone_X
# The below functions will generate the all possible
# combinations for different hyper parameter values
getInstanceParam = function(Frame[String] instance, Frame[String] param )
return(list[Unknown] L)
L = list();
parameters = matrix(0,0,1)
hpNum = matrix(0,1,ncol(instance))
for(i in 1:ncol(instance)) {
pVector = matrix(0,1,ncol(param))
coeff = as.scalar(instance[1,i])
for(j in 1:nrow(param)) {
if(as.scalar(instance[1,i]) == as.scalar(param[j,1]))
pVector = as.matrix(param[j,2:ncol(param)])
hpNum[1,i] = as.scalar(pVector[1,1])
if(as.scalar(pVector[1,1]) > 0)
while(p <= as.integer(as.scalar(pVector[1,1])))
# print("check point 1")
count = 1;
kVector = matrix(0,as.scalar(pVector[1,4])/as.scalar(pVector[1,3]),1)
inner = as.scalar(pVector[1,2])
while(inner <= as.scalar(pVector[1,4]))
kVector[count,1] = inner;
inner = inner + as.scalar(pVector[1,3])
count = count+1
pVector[1,2] = 0; pVector[1,3]=0; pVector[1,4]=0;
pVector = removeEmpty(target = pVector, margin="cols")
p = p+1
if(sum(parameters) == 0){
parameters = rbind(parameters, matrix(0, nrow(kVector)-nrow(parameters), ncol(parameters)))
parameters = cbind(parameters, kVector)
else parameters = getParaCombinations(parameters, kVector)
index = 1
parameters = removeEmpty(target = parameters, margin="cols")
parameters = rbind(parameters, matrix(0,1,ncol(parameters)))
for(i in 1:ncol(instance))
if(as.scalar(hpNum[1, i]) > 0)
L = append(L, parameters[,index:(index+as.scalar(hpNum[1,i]))-1])
index = index+as.scalar(hpNum[1,i])
L = append(L, matrix(-1, 1, 1))
getParaCombinations = function(Matrix[Double] para, Matrix[Double] vec)
return (Matrix[Double] para)
v_temp = matrix(0,0,1)
p_temp = matrix(0,0,ncol(para))
for(i in 1:nrow(vec))
v = matrix(as.scalar(vec[i,1]), nrow(para), 1)
v_temp = rbind(v_temp, v)
p_temp = rbind(p_temp,para)
para = cbind(p_temp, v_temp)
isSpecial = function(String op)
return(Boolean yes){
yes = (op == "mice")
getPipelineSum = function(List[Unknown] paraList, Boolean verbose)
return (Double psum)
for(i in 1:length(paraList))
psum = sum(as.matrix(paraList[i]))
psum = 0.0
# This function will compute the top k pipelines from the results of all executions
getFinalTopK = function(Frame[String] pipeline, Matrix[Double] hparameter, Integer k)
return (Frame[String] pipeline)
if(nrow(pipeline) < k)
stop("the top k should be less than the total pipelines")
# combine all parameter i.e., operation and hyper-parameter values
allParam = cbind(pipeline, as.frame(hparameter))
# get the indexes of columns for recode transformation
idx = seq(1, ncol(pipeline))
index = utils::vectorToCsv(idx)
# encoding categorical columns using recode transformation
jspecR = "{ids:true, recode:["+index+"]}";
[X, M] = transformencode(target=allParam, spec=jspecR);
top = order(target=X, by=ncol(X), decreasing=TRUE, index.return=FALSE);
pipeline = transformdecode(target=top, spec=jspecR, meta=M);
# TODO if k+n pipelines have same accuracy then how to return k pipelines
pipeline = pipeline[1:k,]
# These private function are used to impute values by mean and by median
imputeByMean = function(Matrix[Double] X, Boolean verbose = FALSE)
return(Matrix[Double] X)
Mask = is.nan(X)
X = replace(target=X, pattern=NaN, replacement=0)
Mask = Mask * (colMeans(X))
X = X + Mask
imputeByMedian = function(Matrix[Double] X, Boolean verbose = FALSE)
return(Matrix[Double] X)
cols = ncol(X)
colMedian = matrix(0, 1, cols)
Mask = is.nan(X)
X = replace(target=X, pattern=NaN, replacement=0)
parfor(i in 1:cols)
colMedian[, i] = median(X[,i])
Mask = Mask * colMedian
X = X + Mask
# Function to evaluate the pipeline using classification accuracy
fclassify = function(Matrix[Double] X)
return (Double accuracy)
if(min(X[,1]) < 1)
stop("Y should contain value greater than zero")
n = nrow(X)
d = ncol(X)
temp = rand(rows=n, cols=1, min = 0, max = 1, sparsity=1) <= 0.3
tempI = temp == 0
sel = diag(temp)
selI = diag(tempI)
sel = removeEmpty(target = sel, margin = "rows")
selI = removeEmpty(target = selI, margin = "rows")
testSet = sel %*% X
trainSet = selI %*% X
nTrain = nrow(trainSet)
dTrain = ncol(trainSet)
nTest = nrow(testSet)
dTest = ncol(testSet)
train_X = trainSet[, 2:dTrain]
train_Y = trainSet[, 1]
test_X = testSet[, 2:dTest]
test_Y = testSet[, 1]
betas = multiLogReg(X=train_X, Y=train_Y, icpt=2, tol=1e-9, reg=1.2, maxi=100, maxii=0, verbose=FALSE)
[prob, yhat, accuracy] = multiLogRegPredict(test_X, betas, test_Y, FALSE)
# Enumeration call
X = read($1, data_type="matrix", format="csv", header=TRUE);
Y = X[,1]+1
X = X[,2:ncol(X)]
L = read($2, data_type="frame", format="csv");
OP = read($3, data_type="frame", format="csv");
MVIP = read($4, data_type="frame", format="csv");
param = read($5, data_type="frame", format="csv");
R = enumerator(X, Y, L, OP, MVIP, param, 5, TRUE);
write(R, $6, format="csv", sep=",")