blob: 4303b52aef742a8321cd2b9dd140e6ed8322fbd3 [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
enumerator = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] logical, Frame[String] outlierPrimitives,
Frame[String] mviPrimitives, Frame[String] param, Integer k, Boolean verbose = TRUE)
return(Frame[String] Kpipeline)
{
for(i in 1:1) {#nrow(logical)
operator = as.frame(matrix(0,nrow(outlierPrimitives),1)) #combine all logical primitives
for(j in 1:ncol(logical))
{
if(as.scalar(logical[i,j]) == "outlier")
operator = cbind(operator, outlierPrimitives);
else if(as.scalar(logical[i,j]) == "MVI")
operator = cbind(operator, mviPrimitives);
}
operator = operator[,2:ncol(operator)]
intermediates = generatePermutations(operator) # get the all possible combination of pysical primitives
# for ith logical pipeline
if(verbose)
print(" pipelines \n"+toString(intermediates))
[p, h] = executeAll(X, Y,intermediates, param, verbose);
Kpipeline = getFinalTopK(p, h, k)
}
# if(verbose)
print("final top k pipelines \n"+toString(Kpipeline))
}
# The pipeline execution functions
###################################################
executeAll = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] intermediates, Frame[String] param, Boolean verbose)
return(Frame[String] topP, Matrix[Double] topH)
{
topP = as.frame("")
topH = matrix(0,1,1)
p = as.frame("")
hp = matrix(0,1,1)
clone_X = X;
if(verbose)
print("total pipelines to be executed "+nrow(intermediates))
for(i in 1:nrow(intermediates)) {
paraList = list()
paraList = getInstanceParam(intermediates[i,], param)
sum = 1
while(sum > 0) #condition to terminate when all hyper parameters are executed
{
paramL = list()
tmp_hp = matrix(0,1,1)
tmp_p = intermediates[i,]
for(j in 1: length(paraList))
{
singleHp = as.matrix(paraList[j])
paramL = append(paramL, singleHp[1, ])
tmp_hp = cbind(tmp_hp, as.matrix(ncol(p)))
tmp_hp = cbind(tmp_hp, p[1,])
if(nrow(singleHp) > 1)
{
singleHp = singleHp[2:nrow(singleHp),]
paraList[j] = singleHp
sum = sum(singleHp)
}
}
X = executePipeline(intermediates[i,], X, paramL, FALSE)
data = cbind(Y, X)
acc = eval("fclassify", data)
X = clone_X
tmp_hp = cbind(tmp_hp,acc)
if(ncol(p) == 1 & sum(hp) == 0){
p = tmp_p
hp = tmp_hp
} else {
p = rbind(p, tmp_p)
hp = rbind(hp, tmp_hp)
}
}
if(ncol(topP) == 1 & sum(topH) == 0){
topP = p
topH = hp
}
else {
if(ncol(p) < ncol(topP)){
margin = ncol(topP) - ncol(p)
toAppend = topP[1,1:margin]
toAppend[1,] = ""
p = cbind(p, toAppend)
}
else if(ncol(hp) < ncol(topH))
hp = cbind(matrix(0,nrow(hp),ncol(topH) - ncol(hp)), hp)
else if(ncol(hp) > ncol(topH))
topH = cbind(matrix(0,nrow(topH),ncol(hp) - ncol(topH)), topH)
topP = rbind(topP, p)
topH = rbind(topH, hp)
}
X = clone_X
}
}
# The below functions will generate the all possible
# physical pipelines for a given logical pipeline
###################################################
generatePermutations = function(Frame[String] operators)
return (Frame[String] combinations)
{
if(ncol(operators) == 1)
stop("invalid number of columns")
if(ncol(operators) > 2 ) {
com2 = generatePermutationsOf2(operators[,1:2])
operators = operators[,3:ncol(operators)]
for(out in 1: ncol(operators)) {
temp = com2[,1]
temp1 = com2[1,1]
comTemp = com2[1,]
for(i in 1:nrow(operators)) {
for(j in 1:nrow(com2))
temp[j,1] = operators[i,out]
temp1 = rbind(temp1, temp)
comTemp = rbind(comTemp, com2)
}
comTemp = cbind(comTemp, temp1)
com2 = comTemp[2:nrow(comTemp),]
}
combinations = com2
}
else
combinations = generatePermutationsOf2(operators)
}
generatePermutationsOf2 = function(Frame[String] operators )
return(Frame[String] output)
{
jspecR = "{ids:true, recode:[1,2]}";
[X, M] = transformencode(target=operators, spec=jspecR);
out = matrix(0,0,2)
for(i in 1:nrow(X[,2])) {
broadcast = matrix(as.scalar(X[i,2]), nrow(X), 1)
if(nrow(out) == 0){
out = cbind(X[,1], broadcast)
}
else {
output_tmp = cbind(X[,1], broadcast)
out = rbind(out, output_tmp)
}
}
output = transformdecode(target=out, spec=jspecR, meta=M);
}
# The below functions will generate the all possible
# combinations for different hyper parameter values
###################################################
getInstanceParam = function(Frame[String] instance, Frame[String] param )
return(list[Unknown] L)
{
L = list();
parameters = matrix(0,0,1)
hpNum = matrix(0,1,ncol(instance))
for(i in 1:ncol(instance)) {
pVector = matrix(0,1,ncol(param))
coeff = as.scalar(instance[1,i])
for(j in 1:nrow(param)) {
if(as.scalar(instance[1,i]) == as.scalar(param[j,1]))
pVector = as.matrix(param[j,2:ncol(param)])
}
hpNum[1,i] = as.scalar(pVector[1,1])
if(as.scalar(pVector[1,1]) > 0)
{
p=1;
while(p <= as.integer(as.scalar(pVector[1,1])))
{
# print("check point 1")
count = 1;
kVector = matrix(0,as.scalar(pVector[1,4])/as.scalar(pVector[1,3]),1)
inner = as.scalar(pVector[1,2])
while(inner <= as.scalar(pVector[1,4]))
{
kVector[count,1] = inner;
inner = inner + as.scalar(pVector[1,3])
count = count+1
}
pVector[1,2] = 0; pVector[1,3]=0; pVector[1,4]=0;
pVector = removeEmpty(target = pVector, margin="cols")
p = p+1
if(sum(parameters) == 0){
parameters = rbind(parameters, matrix(0, nrow(kVector)-nrow(parameters), ncol(parameters)))
parameters = cbind(parameters, kVector)
}
else parameters = getParaCombinations(parameters, kVector)
}
}
}
index = 1
parameters = removeEmpty(target = parameters, margin="cols")
parameters = rbind(parameters, matrix(0,1,ncol(parameters)))
for(i in 1:ncol(instance))
{
if(as.scalar(hpNum[1,i]) > 0)
{
L = append(L, parameters[,index:(index+as.scalar(hpNum[1,i]))-1])
index = index+as.scalar(hpNum[1,i])
}
else
L = append(L, matrix(-1,1,1))
}
}
getParaCombinations = function(Matrix[Double] para, Matrix[Double] vec)
return (Matrix[Double] para)
{
v_temp = matrix(0,0,1)
p_temp = matrix(0,0,ncol(para))
for(i in 1:nrow(vec))
{
v = matrix(as.scalar(vec[i,1]), nrow(para), 1)
v_temp = rbind(v_temp, v)
p_temp = rbind(p_temp,para)
}
para = cbind(p_temp, v_temp)
}
isSpecial = function(String op)
return(Boolean yes){
yes = (op == "mice")
}
getPipelineSum = function(List[Unknown] paraList, Boolean verbose)
return (Double psum)
{
for(i in 1:length(paraList))
{
if(exists(as.matrix(paraList[i])))
psum = sum(as.matrix(paraList[i]))
else
psum = 0.0
}
}
# This function will compute the top k pipelines from the results of all executions
##################################################################################
getFinalTopK = function(Frame[String] pipeline, Matrix[Double] hparameter, Integer k)
return (Frame[String] pipeline)
{
s=""
for(i in 1: ncol(pipeline), check =0)
s = s+i+",";
# encoding categorical columns using recode transformation
jspecR = "{ids:true, recode:["+s+"]}";
[X, M] = transformencode(target=pipeline, spec=jspecR);
nColPip = ncol(pipeline)
allParam = cbind(X, hparameter)
clone_Param = allParam
emptyR = matrix(0,0,ncol(allParam))
while(nrow(emptyR) <= k)
{
maxFirst = clone_Param[, ncol(clone_Param)] == max(clone_Param[, ncol(clone_Param)])
clone_Param = clone_Param * (maxFirst == 0)
emptyR = removeEmpty(target = clone_Param, margin = "rows", select = (clone_Param[, ncol(clone_Param)] == 0) )
}
top = removeEmpty(target = allParam, margin = "rows", select = (clone_Param[, ncol(clone_Param)] == 0) )
X = top[,1:nColPip]
hparameter = top[,nColPip+1:ncol(top)]
pipeline = transformdecode(target=X, spec=jspecR, meta=M);
pipeline = cbind(pipeline, as.frame(hparameter))
# TODO if k+n pipelines have same accuracy then how to return k pipelines
pipeline = pipeline[1:k,]
}
# These private function are used to impute values and classification
##################################################################################
imputeByMean = function(Matrix[Double] X, Boolean verbose = FALSE)
return(Matrix[Double] X)
{
Mask = is.nan(X)
X = replace(target=X, pattern=NaN, replacement=0)
Mask = Mask * (colMeans(X))
X = X + Mask
}
imputeByMedian = function(Matrix[Double] X, Boolean verbose = FALSE)
return(Matrix[Double] X)
{
cols = ncol(X)
colMedian = matrix(0, 1, cols)
X = replace(target=X, pattern=NaN, replacement=0)
Mask = is.nan(X)
parfor(i in 1:cols)
colMedian[, i] = median(X[,i])
Mask = Mask * colMedian
X = X + Mask
}
fclassify = function(Matrix[Double] X)
return (Double accuracy)
{
if(min(X[,1]) < 1)
stop("Y should contain value greater than zero")
n = nrow(X)
d = ncol(X)
temp = rand(rows=n, cols=1, min = 0, max = 1, sparsity=1) <= 0.3
tempI = temp == 0
sel = diag(temp)
selI = diag(tempI)
sel = removeEmpty(target = sel, margin = "rows")
selI = removeEmpty(target = selI, margin = "rows")
testSet = sel %*% X
trainSet = selI %*% X
nTrain = nrow(trainSet)
dTrain = ncol(trainSet)
nTest = nrow(testSet)
dTest = ncol(testSet)
train_X = trainSet[, 2:dTrain]
train_Y = trainSet[, 1]
test_X = testSet[, 2:dTest]
test_Y = testSet[, 1]
betas = multiLogReg(X=train_X, Y=train_Y, icpt=2, tol=1e-9, reg=1.2, maxi=100, maxii=0, verbose=FALSE)
[prob, yhat, accuracy] = multiLogRegPredict(test_X, betas, test_Y, FALSE)
}
##########################################
## Call the function Enumerator
#########################################
X = read($1, data_type="matrix", format="csv", header=TRUE);
Y = X[,1]+1
X = X[,2:ncol(X)]
L = read($2, data_type="frame", format="csv");
OP = read($3, data_type="frame", format="csv");
MVIP = read($4, data_type="frame", format="csv");
param = read($5, data_type="frame", format="csv");
R = enumerator(X, Y, L, OP, MVIP, param, 5, TRUE);
write(R, $6, format="csv", sep=",")