| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[String] pipeline, Matrix[Double] X, Matrix[Double] Y, |
| Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, Matrix[Double] hyperParameters, Matrix[Double] hpForPruning = as.matrix(0), |
| Matrix[Double] changesByOp = as.matrix(0), Integer flagsCount, Boolean test = FALSE, Boolean verbose) |
| return (Matrix[Double] X, Matrix[Double] Y, Matrix[Double] Xtest, Matrix[Double] Ytest, Double t2, Matrix[Double] hpForPruning, Matrix[Double] changesByOp) |
| { |
| mask=as.matrix(metaList['mask']) |
| FD = as.matrix(metaList['fd']) |
| |
| cloneY = Y |
| Xorig = X |
| # # combine X and Y |
| n = nrow(X) |
| d = ncol(Xorig) |
| X = rbind(X, Xtest) |
| Y = rbind(Y, Ytest) |
| testRow = nrow(Xtest) |
| Xout = X |
| t1 = time() |
| #print("PIPELINE EXECUTION START ... "+toString(pipeline)) |
| if(verbose) { |
| print("checks rows in X = "+nrow(X)+" rows in Y = "+nrow(Y)+" cols in X = "+ncol(X)+" col in Y = "+ncol(Y)) |
| print("pipeline in execution "+toString(pipeline)) |
| print("pipeline hps "+toString(hyperParameters)) |
| } |
| for(i in 1:ncol(pipeline)) { |
| trainEndIdx = (nrow(X) - nrow(Xtest)) |
| testStIdx = trainEndIdx + 1 |
| op = as.scalar(pipeline[1,i]) |
| lgOp = as.scalar(logical[1,i]) |
| |
| if(test == FALSE | lgOp != "CI") { |
| Xclone = X |
| [hp, dataFlag, yFlag, executeFlag] = matrixToList(X, Y, mask, FD, hyperParameters[i], flagsCount, op) |
| if(executeFlag == 1) { |
| X = eval(op, hp) |
| Xout = X |
| X = confirmData(X, Xclone, mask, dataFlag, yFlag) |
| # dataFlag 0 = only on numeric, 1 = on whole data |
| if(yFlag) |
| { |
| Y = X[, ncol(X)] |
| X = X[, 1:ncol(X) - 1] |
| } |
| X = confirmMeta(X, mask) |
| } |
| else { |
| print("not applying "+op+" executeFlag = 0") |
| } |
| } |
| else { |
| Xclone = X |
| #print("not applying "+lgOp+" "+op+" on data test flag: "+test) |
| Xtest = X[testStIdx:nrow(X), ] |
| Ytest = Y[testStIdx:nrow(X), ] |
| X = X[1:trainEndIdx, ] |
| Y = Y[1:trainEndIdx, ] |
| [hp, dataFlag, yFlag, executeFlag] = matrixToList(X, Y, mask, FD, hyperParameters[i], flagsCount, op) |
| if(executeFlag == 1) |
| { |
| X = eval(op, hp) |
| X = confirmData(X, Xclone, mask, dataFlag, yFlag) |
| # dataFlag 0 = only on numeric, 1 = on whole data |
| if(yFlag) |
| { |
| Y = X[, ncol(X)] |
| X = X[, 1:ncol(X) - 1] |
| } |
| X = confirmMeta(X, mask) |
| X = rbind(X, Xtest) |
| Y = rbind(Y, Ytest) |
| } |
| else { |
| print("not applying "+op+" executeFlag = 0") |
| } |
| } |
| if(as.scalar(pipeline[1, i]) == "outlierBySd" | as.scalar(pipeline[1, i]) == "outlierByIQR" | as.scalar(pipeline[1, i]) == "imputeByFd") { |
| changes = sum(abs(replace(target=Xout, pattern=NaN, replacement=0) - replace(target=as.matrix(hp[1]), pattern=NaN, replacement=0)) > 0.001 ) |
| [hpForPruning, changesByOp] = storeDataForPrunning(pipeline, hyperParameters, hpForPruning, changesByOp, changes, i) |
| } |
| } |
| Xtest = X[testStIdx:nrow(X), ] |
| Ytest = Y[testStIdx:nrow(X), ] |
| X = X[1:trainEndIdx] |
| Y = Y[1:trainEndIdx] |
| # # # do a quick validation check |
| if(nrow(Xtest) != testRow) |
| stop("executePipeline: test rows altered") |
| t2 = floor((time() - t1) / 1e+6) |
| |
| #print("PIPELINE EXECUTION ENDED: "+t2+" ms") |
| } |
| |
| # This function will convert the matrix row-vector into list |
| matrixToList = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Matrix[Double] FD, |
| Matrix[Double] p, Integer flagsCount, String op) |
| return (List[Unknown] l, Integer dataFlag, Integer yFlag, Integer executeFlag) |
| { |
| NUM_META_FLAGS = flagsCount; |
| dataFlag = as.integer(as.scalar(p[1, ncol(p)])) |
| hasVerbose = as.integer(as.scalar(p[1, ncol(p) - 1])) |
| yFlag = as.integer(as.scalar(p[1, ncol(p) - 2])) |
| fDFlag = as.integer(as.scalar(p[1, ncol(p)-3])) |
| maskFlag = as.integer(as.scalar(p[1, ncol(p)-4])) |
| |
| executeFlag = 1 |
| ###################################################### |
| # CHECK FOR DATA FLAG |
| if(dataFlag == 0) |
| { |
| if(sum(mask) == ncol(mask)) |
| executeFlag = 0 |
| else { |
| # take numerics out and remove categorical |
| X = removeEmpty(target=X, margin = "cols", select = (mask == 0)) |
| } |
| } |
| else if(dataFlag == 1) |
| { |
| if(sum(mask) == 0) |
| executeFlag = 0 |
| else { |
| # take categorical out and remove numerics |
| X = removeEmpty(target=X, margin = "cols", select = mask) |
| } |
| } |
| |
| l = list(X) |
| |
| ###################################################### |
| # CHECK FOR Y APPEND FLAG |
| |
| if(yFlag == 1) { |
| l = append(l, Y) |
| } |
| ###################################################### |
| # CHECK FOR FD APPEND FLAG |
| if(fDFlag == 1) |
| { |
| l = append(l, FD) |
| } |
| |
| ###################################################### |
| # CHECK FOR MASK APPEND FLAG |
| if(maskFlag == 1) |
| { |
| l = append(l, mask) |
| } |
| ##################################################### |
| # POPULATE HYPER PARAM |
| # get the number of hyper-parameters and loop till that |
| no_of_hyperparam = as.scalar(p[1,1]) |
| if(no_of_hyperparam > 0) { |
| for(i in 1:no_of_hyperparam) |
| l = append(l, as.scalar(p[1,(i+1)])) |
| } |
| ###################################################### |
| # CHECK FOR VERBOSE FLAG |
| if(hasVerbose == 1) |
| l = append(l, FALSE) |
| |
| } |
| |
| confirmMeta = function(Matrix[Double] X, Matrix[Double] mask) |
| return (Matrix[Double] X) |
| { |
| if((sum(mask) > 0) & (ncol(X) == ncol(mask))) |
| { |
| # get the max + 1 for nan replacement |
| nanMask = is.na(X) |
| # replace nan |
| X = replace(target = X, pattern = NaN, replacement = 9999) |
| # take categorical out |
| cat = removeEmpty(target=X, margin="cols", select = mask) |
| # round categorical (if there is any floating point) |
| cat = round(cat) |
| # reconstruct original X |
| X = X * (mask == 0) |
| q = table(seq(1, ncol(cat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask)), ncol(cat), ncol(X)) |
| X = (cat %*% q) + X |
| |
| # put nan back |
| nanMask = replace(target = nanMask, pattern = 1, replacement = NaN) |
| X = X + nanMask |
| # print("X less than equal to zero "+sum(cat <= 0)) |
| } |
| } |
| |
| |
| confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag, Integer yFlag) |
| return (Matrix[Double] X) |
| { |
| if(yFlag == 1) |
| { |
| Y = nX[, ncol(nX)] |
| nX = nX[, 1: ncol(nX) - 1] |
| |
| } |
| if(dataFlag == 0 & (sum(mask) > 0) & (sum(mask) != ncol(originalX))) |
| { |
| maxDummy = max(nX) + 1 |
| nX = replace(target = nX, pattern = NaN, replacement = maxDummy) |
| # X without numerics |
| Xcat = removeEmpty(target=originalX, margin="cols", select=mask) |
| nanMask = is.na(Xcat) |
| Xcat = replace(target = Xcat, pattern = NaN, replacement = -1111) |
| # print("unchanged data \n"+toString(originalX, rows=10)) |
| |
| # reconstruct the original matrix |
| p = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask==0)), ncol(nX), ncol(originalX)) |
| q = table(seq(1, ncol(Xcat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask)), ncol(Xcat), ncol(originalX)) |
| X = (nX %*% p) + (Xcat %*% q) |
| |
| X = replace(target = X, pattern = maxDummy, replacement = NaN) |
| X = replace(target = X, pattern = -1111, replacement = NaN) |
| } |
| else if(dataFlag == 1 & (sum(mask) > 0) & (sum(mask) != ncol(originalX))) |
| { |
| maxDummy = max(nX) + 1 |
| nX = replace(target = nX, pattern = NaN, replacement = maxDummy) |
| # X without categorical |
| Xnum = removeEmpty(target=originalX, margin="cols", select=(mask==0)) |
| nanMask = is.na(Xnum) |
| Xnum = replace(target = Xnum, pattern = NaN, replacement = -1111) |
| # reconstruct the original matrix |
| p = table(seq(1, ncol(Xnum)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask==0)), ncol(Xnum), ncol(originalX)) |
| q = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask)), ncol(nX), ncol(originalX)) |
| X = (nX %*% q) + (Xnum %*% p) |
| X = replace(target = X, pattern = maxDummy, replacement = NaN) |
| X = replace(target = X, pattern = -1111, replacement = NaN) |
| |
| } |
| else X = nX |
| # print("recreated data \n"+toString(X, rows = 20)) |
| |
| if(yFlag == 1) |
| X = cbind(X, Y) |
| |
| } |
| |
| |
| ####################################################################### |
| # Wrapper of transformencode OHE call, to call inside eval as a function |
| # Inputs: The input dataset X, and mask of the columns |
| # Output: OHEd matrix X |
| ####################################################################### |
| |
| dummycoding = function(Matrix[Double] X, Matrix[Double] mask) |
| return (Matrix[Double] dX_train) { |
| |
| if(sum(mask) > 0) |
| { |
| X = replace(target=X, pattern=NaN, replacement=1) |
| idx = vectorToCsv(mask) |
| # specifications for one-hot encoding of categorical features |
| jspecDC = "{ids:true, dummycode:["+idx+"]}"; |
| # OHE of categorical features |
| [dX_train, dM] = transformencode(target=as.frame(X), spec=jspecDC); |
| } |
| else dX_train = X |
| } |
| |
| |
| ####################################################################### |
| # Wrapper of imputeByFD OHE call, to call inside eval as a function |
| # Inputs: The input dataset X, and mask of the columns and threshold value |
| # Output: filled matrix X |
| ####################################################################### |
| |
| imputeByFd = function(Matrix[Double] X, Matrix[Double] fdMask, Double threshold) |
| return (Matrix[Double] X_filled) |
| { |
| if(sum(fdMask) > 0) |
| { |
| fdMask = removeEmpty(target=fdMask, margin="cols") |
| FD = discoverFD(X=replace(target=X, pattern=NaN, replacement=1), Mask=fdMask, threshold=threshold) |
| FD = (diag(matrix(1, rows=nrow(FD), cols=1)) ==0) * FD |
| FD = FD > 0 |
| if(sum(FD) > 0) |
| { |
| for(i in 1: nrow(FD)) |
| { |
| for(j in 1:ncol(FD)) { |
| if(as.scalar(FD[i, j]) > 0 & (min(X[, i]) != 0) & (min(X[, j]) != 0) & (sum(FD[, j]) != nrow(FD)) |
| & (as.scalar(fdMask[1, j]) != 0) & (as.scalar(fdMask[1, i]) != 0)) |
| X = imputeByFD(X, i, j, threshold, FALSE) |
| } |
| } |
| } |
| } |
| X_filled = X |
| } |
| |
| ####################################################################### |
| # Wrapper of na_lof to call inside eval as a function |
| # Output: filled matrix X |
| ####################################################################### |
| |
| forward_fill = function(Matrix[Double] X, Boolean op, Boolean verbose) |
| return (Matrix[Double] X_filled) |
| { |
| option = ifelse(op, "locf", "nocb") |
| X_filled = na_locf(X=X, option=option, verbose=verbose) |
| } |
| |
| |
| |
| # smote wrapper for doing relative over-sampling |
| SMOTE = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Integer remainingRatio, Boolean verbose) |
| return (Matrix[Double] XY) |
| { |
| # get the class count |
| classes = table(Y[, 1], 1) |
| minClass = min(classes) |
| maxClass = max(classes) |
| diff = (maxClass - minClass)/sum(classes) |
| if(diff > 0.5) |
| { |
| #print("initiating oversampling") |
| XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE) |
| synthesized = matrix(0,0,0) # initialize variable |
| start_class = 1 |
| end_class = 0 |
| k = table(XY[, 1], 1) |
| getMax = max(k) |
| maxKIndex = as.scalar(rowIndexMax(t(k))) |
| outSet = matrix(0, 0, ncol(XY)) |
| remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - (remainingRatio%%100)), |
| remainingRatio-(remainingRatio%%100)) |
| #print("remaining ratio: "+remainingRatio) |
| for(i in 1: nrow(k), check=0) { |
| end_class = end_class + as.scalar(classes[i]) |
| class_t = XY[start_class:end_class, ] |
| if((i != maxKIndex)) { |
| synthesized = smote(class_t[, 2:ncol(XY)], mask, remainingRatio, 1, FALSE) |
| synthesized = cbind(matrix(as.scalar(class_t[2,1]), nrow(synthesized), 1), synthesized) |
| outSet = rbind(outSet, synthesized) |
| } |
| start_class = end_class + 1 |
| } |
| |
| XY = rbind(XY, synthesized) |
| Y = XY[, 1] |
| X = XY[, 2:ncol(XY)] |
| XY = cbind(X,Y) |
| classes = table(Y, 1) |
| } |
| else { |
| #print("smote not applicable") |
| XY = cbind(X, Y) |
| } |
| } |
| |
| |
| ######################################################## |
| # The function will replace the null with default values |
| ######################################################## |
| fillDefault = function(Matrix[Double] X) |
| return(Matrix[Double] X){ |
| defaullt = round(colMaxs(X) - colMins(X)) |
| Mask = is.na(X) |
| X = replace(target=X, pattern=NaN, replacement=0) |
| Mask = Mask * defaullt |
| X = X + Mask |
| # print("fillDefault: no of NaNs "+sum(is.na(X))) |
| } |
| |
| ######################################################## |
| # A slightly changes version of PCA |
| ######################################################## |
| m_pca = function(Matrix[Double] X, Integer K=2, Boolean center=TRUE, Boolean scale=TRUE) |
| return (Matrix[Double] Xout) |
| { |
| |
| if(K < ncol(X) - 1) { |
| N = nrow(X); |
| D = ncol(X); |
| |
| # perform z-scoring (centering and scaling) |
| [X, Centering, ScaleFactor] = scale(X, center, scale); |
| |
| # co-variance matrix |
| mu = colSums(X)/N; |
| C = (t(X) %*% X)/(N-1) - (N/(N-1))*t(mu) %*% mu; |
| # compute eigen vectors and values |
| [evalues, evectors] = eigen(C); |
| if(nrow(evalues) > 1 & nrow(evectors) > 1) |
| { |
| decreasing_Idx = order(target=evalues,by=1,decreasing=TRUE,index.return=TRUE); |
| diagmat = table(seq(1,D),decreasing_Idx); |
| # sorts eigenvalues by decreasing order |
| evalues = diagmat %*% evalues; |
| # sorts eigenvectors column-wise in the order of decreasing eigenvalues |
| evectors = evectors %*% diagmat; |
| |
| eval_dominant = evalues[1:K, 1]; |
| evec_dominant = evectors[,1:K]; |
| |
| # Construct new data set by treating computed dominant eigenvectors as the basis vectors |
| Xout = X %*% evec_dominant; |
| Mout = evec_dominant; |
| } |
| else Xout = X # these elses could be removed via initiating Xout = X for now they are here for readability |
| } |
| else Xout = X |
| Xout = replace(target=Xout, pattern=1/0, replacement=0); |
| } |
| |
| wtomeklink = function(Matrix[Double] X, Matrix[Double] y) |
| return (Matrix[Double] XY) { |
| [Xunder, Yunder, rmv] = tomeklink(X, y) |
| XY = cbind(Xunder, Yunder) |
| } |
| |
| storeDataForPrunning = function(Frame[Unknown] pipeline, Matrix[Double] hp, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Integer changes, Integer i) |
| return(Matrix[Double] hpForPruning, Matrix[Double] changesByOp) |
| { |
| if(ncol(hpForPruning) > 1) { |
| hpForPruning[1, i] = hp[i, 2] |
| changesByOp[1, i] = changes |
| } |
| } |
| |