| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| # import |
| source("./scripts/staging/pipelines/permutations.dml") as perm; |
| source("./scripts/staging/pipelines/utils.dml") as utils; |
| |
| enumerator = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] logical, Frame[String] outlierPrimitives, |
| Frame[String] mviPrimitives, Frame[String] param, Integer k, Boolean verbose = TRUE) |
| return(Frame[String] Kpipeline) |
| { |
| for(i in 1:2) {#nrow(logical) |
| operator = as.frame(matrix(0,nrow(outlierPrimitives),1)) #combine all logical primitives |
| for(j in 1:ncol(logical)) |
| { |
| if(as.scalar(logical[i,j]) == "outlier") |
| operator = cbind(operator, outlierPrimitives); |
| else if(as.scalar(logical[i,j]) == "MVI") |
| operator = cbind(operator, mviPrimitives); |
| } |
| operator = operator[,2:ncol(operator)] |
| intermediates = perm::getPermutations(operator) # get the all possible combination of physical primitives |
| # for ith logical pipeline |
| if(verbose) |
| print(" pipelines \n"+toString(intermediates)) |
| [p, h] = executeAll(X, Y,intermediates, param, verbose); |
| |
| Kpipeline = getFinalTopK(p, h, k) |
| print("top k pipelines of "+i+"th logical pipeline "+toString(Kpipeline)) |
| # str = "top k pipelines of iteration "+i |
| # str = append(str, toString(Kpipeline)) |
| } |
| # if(verbose) |
| # print("final top k pipelines \n"+toString(Kpipeline)) |
| # write(str, "D:/Workspace/Pipelines/output/kpipeline.txt") |
| } |
| |
| # The pipeline execution functions |
| ################################################### |
| executeAll = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] intermediates, Frame[String] param, Boolean verbose) |
| return(Frame[String] opt, Matrix[Double] hyper_param) |
| { |
| |
| |
| clone_X = X; |
| # initialize output variables |
| opt = as.frame("NA") |
| hyper_param = matrix(0,0,1) |
| if(verbose) |
| print("total pipelines to be executed "+nrow(intermediates)) |
| for(i in 1:nrow(intermediates)) { |
| paraList = list() |
| op = intermediates[i,] |
| |
| paraList = getInstanceParam(op, param) |
| sum = 1 |
| print("executing "+toString(op)) |
| while(sum > 0) #condition to terminate when all hyper parameters are executed |
| { |
| paramL = list() |
| hp_temp = matrix(0,1,0) |
| opt_temp = op |
| for(j in 1: length(paraList)) |
| { |
| singleHp = as.matrix(paraList[j]) |
| hp_temp = cbind(hp_temp, as.matrix(ncol(singleHp))) |
| hp_temp = cbind(hp_temp, singleHp[1,]) |
| paramL = append(paramL, singleHp[1, ]) |
| if(nrow(singleHp) > 1) |
| { |
| singleHp = singleHp[2:nrow(singleHp),] |
| paraList[j] = singleHp |
| sum = sum(singleHp) |
| } |
| } |
| X = executePipeline(op, X, paramL, FALSE) |
| data = cbind(Y, X) |
| acc = eval("fclassify", data) |
| hp_temp = cbind(hp_temp, acc) |
| X = clone_X |
| if(as.scalar(opt[1,1]) == "NA" & nrow(hyper_param) == 0) |
| { |
| opt = opt_temp |
| hyper_param = hp_temp |
| } |
| else { |
| opt = rbind(opt, opt_temp) |
| hyper_param = rbind(hyper_param, hp_temp) |
| } |
| } |
| X = clone_X |
| } |
| } |
| |
| |
| # The below functions will generate the all possible |
| # combinations for different hyper parameter values |
| ################################################### |
| getInstanceParam = function(Frame[String] instance, Frame[String] param ) |
| return(list[Unknown] L) |
| { |
| L = list(); |
| |
| parameters = matrix(0,0,1) |
| hpNum = matrix(0,1,ncol(instance)) |
| for(i in 1:ncol(instance)) { |
| pVector = matrix(0,1,ncol(param)) |
| coeff = as.scalar(instance[1,i]) |
| for(j in 1:nrow(param)) { |
| if(as.scalar(instance[1,i]) == as.scalar(param[j,1])) |
| pVector = as.matrix(param[j,2:ncol(param)]) |
| } |
| hpNum[1,i] = as.scalar(pVector[1,1]) |
| if(as.scalar(pVector[1,1]) > 0) |
| { |
| p=1; |
| while(p <= as.integer(as.scalar(pVector[1,1]))) |
| { |
| # print("check point 1") |
| count = 1; |
| kVector = matrix(0,as.scalar(pVector[1,4])/as.scalar(pVector[1,3]),1) |
| inner = as.scalar(pVector[1,2]) |
| while(inner <= as.scalar(pVector[1,4])) |
| { |
| kVector[count,1] = inner; |
| inner = inner + as.scalar(pVector[1,3]) |
| count = count+1 |
| } |
| pVector[1,2] = 0; pVector[1,3]=0; pVector[1,4]=0; |
| pVector = removeEmpty(target = pVector, margin="cols") |
| p = p+1 |
| if(sum(parameters) == 0){ |
| parameters = rbind(parameters, matrix(0, nrow(kVector)-nrow(parameters), ncol(parameters))) |
| parameters = cbind(parameters, kVector) |
| } |
| else parameters = getParaCombinations(parameters, kVector) |
| } |
| } |
| } |
| index = 1 |
| parameters = removeEmpty(target = parameters, margin="cols") |
| parameters = rbind(parameters, matrix(0,1,ncol(parameters))) |
| |
| for(i in 1:ncol(instance)) |
| { |
| if(as.scalar(hpNum[1, i]) > 0) |
| { |
| L = append(L, parameters[,index:(index+as.scalar(hpNum[1,i]))-1]) |
| index = index+as.scalar(hpNum[1,i]) |
| } |
| else |
| L = append(L, matrix(-1, 1, 1)) |
| } |
| } |
| |
| getParaCombinations = function(Matrix[Double] para, Matrix[Double] vec) |
| return (Matrix[Double] para) |
| { |
| v_temp = matrix(0,0,1) |
| p_temp = matrix(0,0,ncol(para)) |
| for(i in 1:nrow(vec)) |
| { |
| v = matrix(as.scalar(vec[i,1]), nrow(para), 1) |
| v_temp = rbind(v_temp, v) |
| p_temp = rbind(p_temp,para) |
| } |
| para = cbind(p_temp, v_temp) |
| } |
| |
| isSpecial = function(String op) |
| return(Boolean yes){ |
| yes = (op == "mice") |
| } |
| |
| getPipelineSum = function(List[Unknown] paraList, Boolean verbose) |
| return (Double psum) |
| { |
| for(i in 1:length(paraList)) |
| { |
| if(exists(as.matrix(paraList[i]))) |
| psum = sum(as.matrix(paraList[i])) |
| else |
| psum = 0.0 |
| } |
| } |
| |
| # This function will compute the top k pipelines from the results of all executions |
| ################################################################################## |
| getFinalTopK = function(Frame[String] pipeline, Matrix[Double] hparameter, Integer k) |
| return (Frame[String] pipeline) |
| { |
| if(nrow(pipeline) < k) |
| stop("the top k should be less than the total pipelines") |
| # combine all parameter i.e., operation and hyper-parameter values |
| allParam = cbind(pipeline, as.frame(hparameter)) |
| # get the indexes of columns for recode transformation |
| idx = seq(1, ncol(pipeline)) |
| index = utils::vectorToCsv(idx) |
| # encoding categorical columns using recode transformation |
| jspecR = "{ids:true, recode:["+index+"]}"; |
| [X, M] = transformencode(target=allParam, spec=jspecR); |
| top = order(target=X, by=ncol(X), decreasing=TRUE, index.return=FALSE); |
| pipeline = transformdecode(target=top, spec=jspecR, meta=M); |
| # TODO if k+n pipelines have same accuracy then how to return k pipelines |
| pipeline = pipeline[1:k,] |
| } |
| |
| # These private function are used to impute values by mean and by median |
| ################################################################################## |
| imputeByMean = function(Matrix[Double] X, Boolean verbose = FALSE) |
| return(Matrix[Double] X) |
| { |
| Mask = is.nan(X) |
| X = replace(target=X, pattern=NaN, replacement=0) |
| Mask = Mask * (colMeans(X)) |
| X = X + Mask |
| } |
| |
| imputeByMedian = function(Matrix[Double] X, Boolean verbose = FALSE) |
| return(Matrix[Double] X) |
| { |
| cols = ncol(X) |
| colMedian = matrix(0, 1, cols) |
| Mask = is.nan(X) |
| X = replace(target=X, pattern=NaN, replacement=0) |
| parfor(i in 1:cols) |
| colMedian[, i] = median(X[,i]) |
| Mask = Mask * colMedian |
| X = X + Mask |
| } |
| |
| |
| # Function to evaluate the pipeline using classification accuracy |
| ################################################################################## |
| fclassify = function(Matrix[Double] X) |
| return (Double accuracy) |
| { |
| if(min(X[,1]) < 1) |
| stop("Y should contain value greater than zero") |
| |
| n = nrow(X) |
| d = ncol(X) |
| |
| temp = rand(rows=n, cols=1, min = 0, max = 1, sparsity=1) <= 0.3 |
| tempI = temp == 0 |
| sel = diag(temp) |
| selI = diag(tempI) |
| sel = removeEmpty(target = sel, margin = "rows") |
| selI = removeEmpty(target = selI, margin = "rows") |
| testSet = sel %*% X |
| trainSet = selI %*% X |
| |
| nTrain = nrow(trainSet) |
| dTrain = ncol(trainSet) |
| nTest = nrow(testSet) |
| dTest = ncol(testSet) |
| |
| train_X = trainSet[, 2:dTrain] |
| train_Y = trainSet[, 1] |
| |
| test_X = testSet[, 2:dTest] |
| test_Y = testSet[, 1] |
| |
| betas = multiLogReg(X=train_X, Y=train_Y, icpt=2, tol=1e-9, reg=1.2, maxi=100, maxii=0, verbose=FALSE) |
| [prob, yhat, accuracy] = multiLogRegPredict(test_X, betas, test_Y, FALSE) |
| } |
| |
| # Enumeration call |
| ################################################################################## |
| X = read($1, data_type="matrix", format="csv", header=TRUE); |
| Y = X[,1]+1 |
| X = X[,2:ncol(X)] |
| L = read($2, data_type="frame", format="csv"); |
| OP = read($3, data_type="frame", format="csv"); |
| MVIP = read($4, data_type="frame", format="csv"); |
| param = read($5, data_type="frame", format="csv"); |
| R = enumerator(X, Y, L, OP, MVIP, param, 5, TRUE); |
| write(R, $6, format="csv", sep=",") |