| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| # This function execute pipeline. |
| # |
| # INPUT: |
| # ---------------------------------------------------------------------------------------- |
| # logical --- |
| # pipeline --- |
| # X --- |
| # Y --- |
| # Xtest --- |
| # Ytest --- |
| # metaList --- |
| # hyperParameters --- |
| # hpForPruning --- |
| # changesByOp --- |
| # flagsCount --- |
| # test --- |
| # verbose --- |
| # ---------------------------------------------------------------------------------------- |
| # |
| # OUTPUT: |
| # -------------------------------------------------------------------------------------- |
| # X --- |
| # Y --- |
| # Xtest --- |
| # Ytest --- |
| # t2 --- |
| # hpForPruning --- |
| # changesByOp --- |
| # -------------------------------------------------------------------------------------- |
| |
| s_executePipeline = function(Frame[String] pipeline, Matrix[Double] Xtrain, Matrix[Double] Ytrain, |
| Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, Matrix[Double] hyperParameters, Matrix[Double] hpForPruning = as.matrix(0), |
| Matrix[Double] changesByOp = as.matrix(0), Integer flagsCount, Boolean test = FALSE, Boolean verbose) |
| return (Matrix[Double] Xtrain, Matrix[Double] Ytrain, Matrix[Double] Xtest, Matrix[Double] Ytest, |
| Double t2, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Double changesAll, List[Unknown] internalStates) |
| { |
| internalStates = list() |
| mask=as.matrix(metaList['mask']) |
| applyFunc = as.frame(metaList['applyFunc']) |
| changesAll = 0.0 |
| d = ncol(Xtrain) |
| testRow = nrow(Xtest) |
| Xorig = Xtest |
| t1 = time() |
| if(verbose) { |
| print("checks rows in X = "+nrow(Xtrain)+" rows in Y = "+nrow(Ytrain)+" cols in X = "+ncol(Xtrain)+" col in Y = "+ncol(Ytrain)) |
| print("pipeline in execution "+toString(pipeline)) |
| print("pipeline hps "+toString(hyperParameters)) |
| } |
| for(i in 1:ncol(pipeline)) { |
| op = as.scalar(pipeline[1,i]) |
| applyOp = toString(as.scalar(applyFunc[1,i])) |
| Xclone = Xtrain |
| XtestClone = Xtest |
| [hp, dataFlag, yFlag, executeFlag] = matrixToList(Xtrain, Ytrain, mask, as.matrix(metaList['fd']), hyperParameters[i,], flagsCount, op) |
| if(executeFlag == 1) { |
| L = evalList(op, hp) |
| [L, O] = remove(L, 1); |
| Xtrain = as.matrix(O) |
| if(applyOp != "NA") { |
| [Xtest, executeFlag] = applyDataFlag(Xtest, mask, dataFlag) |
| internalStates = append(internalStates, L) |
| L = append(L, list(X=Xtest)); |
| Xtest = eval(applyOp, L); |
| Xtest = confirmData(Xtest, XtestClone, mask, dataFlag) |
| } |
| else { |
| internalStates = append(internalStates, as.frame("NA")) |
| } |
| Xtrain = confirmData(Xtrain, Xclone, mask, dataFlag) |
| |
| # dataFlag 0 = only on numeric, 1 = on whole data |
| if(yFlag) { |
| [L, Y] = remove(L, 1); |
| Ytrain = as.matrix(Y) |
| } |
| # Xtrain = confirmMeta(Xtrain, mask) |
| # Xtest = confirmMeta(Xtest, mask) |
| } |
| else { |
| print("not applying operation executeFlag = 0") |
| } |
| |
| if(ncol(Xtest) == d & nrow(Xtest) == nrow(XtestClone) & ncol(hpForPruning) > 1) { |
| changesSingle = sum(abs(replace(target=Xtest, pattern=NaN, replacement=0) - replace(target=XtestClone, pattern=NaN, replacement=0)) > 0.001 ) |
| changesAll = sum(abs(replace(target=Xtest, pattern=NaN, replacement=0) - replace(target=Xorig, pattern=NaN, replacement=0)) > 0.001 ) |
| |
| if(as.scalar(pipeline[1, i]) == "outlierBySd" | as.scalar(pipeline[1, i]) == "outlierByIQR" | as.scalar(pipeline[1, i]) == "imputeByFd") { |
| |
| hpForPruning[1, i] = hyperParameters[i, 2] |
| changesByOp[1, i] = changesSingle |
| } |
| } |
| } |
| |
| t2 = floor((time() - t1) / 1e+6) |
| } |
| |
| # This function will convert the matrix row-vector into list |
| matrixToList = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Matrix[Double] FD, |
| Matrix[Double] p, Integer flagsCount, String op) |
| return (List[Unknown] l, Integer dataFlag, Integer yFlag, Integer executeFlag) |
| { |
| NUM_META_FLAGS = flagsCount; |
| dataFlag = as.integer(as.scalar(p[1, ncol(p)])) |
| hasVerbose = as.integer(as.scalar(p[1, ncol(p) - 1])) |
| yFlag = as.integer(as.scalar(p[1, ncol(p) - 2])) |
| fDFlag = as.integer(as.scalar(p[1, ncol(p)-3])) |
| maskFlag = as.integer(as.scalar(p[1, ncol(p)-4])) |
| |
| ###################################################### |
| # CHECK FOR DATA FLAG |
| [X, executeFlag] = applyDataFlag(X, mask, dataFlag) |
| l = list(X) |
| |
| ###################################################### |
| # CHECK FOR Y APPEND FLAG |
| if(yFlag == 1) { |
| l = append(l, Y) |
| } |
| |
| ###################################################### |
| # CHECK FOR FD APPEND FLAG |
| if(fDFlag == 1) { |
| l = append(l, FD) |
| } |
| |
| ###################################################### |
| # CHECK FOR MASK APPEND FLAG |
| if(maskFlag == 1) { |
| l = append(l, mask) |
| } |
| |
| ##################################################### |
| # POPULATE HYPER PARAM |
| # get the number of hyper-parameters and loop till that |
| no_of_hyperparam = as.scalar(p[1,1]) |
| if(no_of_hyperparam > 0) { |
| for(i in 1:no_of_hyperparam) |
| l = append(l, as.scalar(p[1,(i+1)])) |
| } |
| |
| ###################################################### |
| # CHECK FOR VERBOSE FLAG |
| if(hasVerbose == 1) |
| l = append(l, FALSE) |
| } |
| |
| applyDataFlag = function(Matrix[Double] X, Matrix[Double] mask, Integer dataFlag) |
| return(Matrix[Double] X,Integer executeFlag) |
| { |
| executeFlag = 1 |
| if(dataFlag == 0) |
| { |
| if(sum(mask) == ncol(mask)) |
| executeFlag = 0 |
| else { |
| # take numerics out and remove categorical |
| X = removeEmpty(target=X, margin = "cols", select = (mask == 0)) |
| } |
| } |
| else if(dataFlag == 1) |
| { |
| if(sum(mask) == 0) |
| executeFlag = 0 |
| else if(sum(mask) != ncol(mask)) { |
| # take categorical out and remove numerics |
| X = removeEmpty(target=X, margin = "cols", select = mask) |
| } |
| else X = X |
| } |
| else X = X |
| } |
| |
| confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag) |
| return (Matrix[Double] X) |
| { |
| |
| if(dataFlag == 0 & (sum(mask) > 0) & (sum(mask) != ncol(originalX))) |
| { |
| maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1 |
| nX = replace(target = nX, pattern = NaN, replacement = maxDummy) |
| # X without numerics |
| Xcat = removeEmpty(target=originalX, margin="cols", select=mask) |
| nanMask = is.na(Xcat) |
| Xcat = replace(target = Xcat, pattern = NaN, replacement = -1111) |
| |
| # reconstruct the original matrix |
| p = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask==0)), ncol(nX), ncol(originalX)) |
| q = table(seq(1, ncol(Xcat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask)), ncol(Xcat), ncol(originalX)) |
| X = (nX %*% p) + (Xcat %*% q) |
| |
| X = replace(target = X, pattern = maxDummy, replacement = NaN) |
| X = replace(target = X, pattern = -1111, replacement = NaN) |
| } |
| else if(dataFlag == 1 & (sum(mask) > 0) & (sum(mask) != ncol(originalX))) |
| { |
| maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1 |
| nX = replace(target = nX, pattern = NaN, replacement = maxDummy) |
| # X without categorical |
| Xnum = removeEmpty(target=originalX, margin="cols", select=(mask==0)) |
| nanMask = is.na(Xnum) |
| Xnum = replace(target = Xnum, pattern = NaN, replacement = -1111) |
| # reconstruct the original matrix |
| p = table(seq(1, ncol(Xnum)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask==0)), ncol(Xnum), ncol(originalX)) |
| q = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask)), ncol(nX), ncol(originalX)) |
| X = (nX %*% q) + (Xnum %*% p) |
| X = replace(target = X, pattern = maxDummy, replacement = NaN) |
| X = replace(target = X, pattern = -1111, replacement = NaN) |
| |
| } |
| else X = nX |
| |
| } |
| |
| |
| |
| ####################################################################### |
| # Wrapper of transformencode OHE call, to call inside eval as a function |
| # Inputs: The input dataset X, and mask of the columns |
| # Output: OHEd matrix X |
| ####################################################################### |
| |
| dummycoding = function(Matrix[Double] X, Matrix[Double] mask) |
| return (Matrix[Double] X, String jspec, Frame[Unknown] meta) { |
| |
| meta = as.frame("NULL") |
| jspec = "" |
| if(sum(mask) > 0) |
| { |
| X = replace(target=X, pattern=NaN, replacement=0) |
| idx = vectorToCsv(mask) |
| # specifications for one-hot encoding of categorical features |
| jspec = "{ids:true, dummycode:["+idx+"]}"; |
| # OHE of categorical features |
| [X, meta] = transformencode(target=as.frame(X), spec=jspec); |
| } |
| } |
| |
| |
| dummycodingApply = function(Matrix[Double] X, String jspec, Frame[Unknown] meta) |
| return (Matrix[Double] Y) { |
| |
| if(jspec != "") |
| { |
| Y = transformapply(target=as.frame(X), spec=jspec, meta=meta); |
| } |
| else Y = X |
| } |
| |
| ####################################################################### |
| # Wrapper of imputeByFD OHE call, to call inside eval as a function |
| # Inputs: The input dataset X, and mask of the columns and threshold value |
| # Output: filled matrix X |
| ####################################################################### |
| |
| imputeByFd = function(Matrix[Double] X, Matrix[Double] fdMask, Double threshold) |
| return (Matrix[Double] X, Matrix[Double] fillMatrix) |
| { |
| fillMatrix = as.matrix(0) |
| if(sum(fdMask) > 0) |
| { |
| t = replace(target=X, pattern=NaN, replacement=1) |
| fdMask = removeEmpty(target=fdMask, margin="cols") |
| FD = discoverFD(X=t, Mask=fdMask, threshold=threshold) |
| FD = (diag(matrix(1, rows=nrow(FD), cols=1)) ==0) * FD |
| FD = FD > 0 |
| fillMatrix = matrix(0, rows=ncol(FD) * ncol(FD), cols=max(t)) |
| if(sum(FD) > 0) |
| { |
| for(i in 1: nrow(FD)) { |
| for(j in 1:ncol(FD)) { |
| if(as.scalar(FD[i, j]) > 0 & (min(X[, i]) != 0) & (min(X[, j]) != 0) & (sum(FD[, j]) != nrow(FD)) |
| & (as.scalar(fdMask[1, j]) != 0) & (as.scalar(fdMask[1, i]) != 0)) { |
| [t, imp] = imputeByFD(X[,i], X[,j], threshold, FALSE) |
| X[, j] = t |
| fillMatrix[ncol(FD) * (i - 1) + j, 1:nrow(imp)] = t(imp) |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| imputeByFdApply = function(Matrix[Double] X, Matrix[Double] fillMatrix) |
| return (Matrix[Double] X) |
| { |
| d = sqrt(nrow(fillMatrix)) |
| for(i in 1: d) |
| { |
| for(j in 1:d) { |
| idx = d * (i - 1) + j |
| if(sum(fillMatrix[idx,]) > 0) { |
| imp = fillMatrix[idx, ] |
| imp = removeEmpty(target=t(imp), margin="rows") |
| X[, j] = imputeByFDApply(X[, i], imp) |
| } |
| } |
| } |
| } |
| |
| ####################################################################### |
| # Wrapper of na_lof to call inside eval as a function |
| # Output: filled matrix X |
| ####################################################################### |
| |
| forward_fill = function(Matrix[Double] X, Boolean op, Boolean verbose) |
| return (Matrix[Double] X_filled, Boolean op, Boolean verbose) |
| { |
| option = ifelse(op, "locf", "nocb") |
| X_filled = na_locf(X=X, option=option, verbose=verbose) |
| } |
| |
| |
| |
| # smote wrapper for doing relative over-sampling |
| SMOTE = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Integer remainingRatio, Boolean verbose) |
| return (Matrix[Double] X, Matrix[Double] Y) |
| { |
| # get the class count |
| classes = table(Y, 1) |
| for(k in 1:nrow(classes) - 1) { |
| minClass = min(classes) |
| maxClass = max(classes) |
| diff = (maxClass - minClass)/sum(classes) |
| if(diff > 0.2 & max(Y) <=2) |
| { |
| XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE) |
| synthesized = matrix(0,0,0) # initialize variable |
| start_class = 1 |
| end_class = 0 |
| kmat = table(XY[, 1], 1) |
| getMax = max(kmat) |
| maxKIndex = as.scalar(rowIndexMax(t(kmat))) |
| outSet = matrix(0, 0, ncol(XY)) |
| remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - (remainingRatio%%100)), |
| remainingRatio-(remainingRatio%%100)) |
| for(i in 1: nrow(kmat), check=0) { |
| end_class = end_class + as.scalar(classes[i]) |
| class_t = XY[start_class:end_class, ] |
| if((i != maxKIndex) & (nrow(class_t) > 1)) { |
| synthesized = smote(class_t[, 2:ncol(XY)], mask, remainingRatio, 1, FALSE) |
| synthesized = cbind(matrix(as.scalar(class_t[2,1]), nrow(synthesized), 1), synthesized) |
| outSet = rbind(outSet, synthesized) |
| } |
| start_class = end_class + 1 |
| } |
| |
| XY = rbind(XY, synthesized) |
| Y = XY[, 1] |
| X = XY[, 2:ncol(XY)] |
| } |
| } |
| } |
| |
| |
| ######################################################## |
| # The function will replace the null with default values |
| ######################################################## |
| fillDefault = function(Matrix[Double] X) |
| return(Matrix[Double] X, Matrix[Double] defaullt){ |
| Mask = is.na(X) |
| X = replace(target=X, pattern=NaN, replacement=0) |
| cmax = colMaxs(X) |
| cmin = colMins(X) |
| defaullt = round(cmax - cmin) |
| Mask = Mask * defaullt |
| X = X + Mask |
| } |
| |
| ######################################################## |
| # The function will replace the null with default values |
| ######################################################## |
| fillDefaultApply = function(Matrix[Double] X, Matrix[Double] defaullt) |
| return(Matrix[Double] X){ |
| Mask = is.na(X) |
| X = replace(target=X, pattern=NaN, replacement=0) |
| Mask = Mask * defaullt |
| X = X + Mask |
| } |
| |
| storeDataForPrunning = function(Frame[Unknown] pipeline, Matrix[Double] hp, Matrix[Double] hpForPruning, |
| Matrix[Double] changesByOp, Integer changes, Integer i) |
| return(Matrix[Double] hpForPruning, Matrix[Double] changesByOp) |
| { |
| if(ncol(hpForPruning) > 1) { |
| hpForPruning[1, i] = hp[i, 2] |
| changesByOp[1, i] = changes |
| } |
| } |
| |
| ######################################################## |
| # The function will flip the noisy labels |
| ######################################################## |
| flipLabels = function(Matrix[Double] X, Matrix[Double] Y, Double threshold, Integer maxIter =10, Boolean verbose = FALSE) |
| return (Matrix[Double] X, Matrix[Double] Y) |
| { |
| classes1 = table(Y, 1) |
| if(min(Y) != max(Y) & nrow(Y) > 1 & max(Y) <= 2) |
| { |
| betas = multiLogReg(X=X, Y=Y, icpt=1, reg=1e-4, maxi=100, maxii=0, verbose=FALSE) |
| [prob, yhat, accuracy] = multiLogRegPredict(X, betas, Y, FALSE) |
| inc = ((yhat != Y) & (rowMaxs(prob) > threshold)) |
| while(sum(inc) > 0 & maxIter > 0 & min(Y) != max(Y) & nrow(Y) > 1) |
| { |
| Xcor = removeEmpty(target = X, margin = "rows", select = (inc==0)) |
| Ycor = removeEmpty(target = Y, margin = "rows", select = (inc==0)) |
| Xinc = removeEmpty(target = X, margin = "rows", select = inc) |
| Yinc = removeEmpty(target = Y, margin = "rows", select = inc) |
| yhat = removeEmpty(target = yhat, margin = "rows", select = inc) |
| prob = removeEmpty(target = prob, margin = "rows", select = inc) |
| inc = removeEmpty(target = inc, margin = "rows", select = inc) |
| # # # replace with second best option |
| replaced = yhat |
| Yinc = yhat |
| X = rbind(Xcor, Xinc) |
| Y = rbind(Ycor, Yinc) |
| maxIter = maxIter - 1 |
| if(min(Y) != max(Y) & nrow(Y) > 1) { |
| betas = multiLogReg(X=X, Y=Y, icpt=1, reg=1e-4, maxi=100, maxii=0, verbose=FALSE) |
| [prob, yhat, accuracy] = multiLogRegPredict(X, betas, Y, FALSE) |
| inc = ((yhat != Y) & (rowMaxs(prob) > threshold)) |
| } |
| } |
| } |
| classes = table(Y, 1) |
| } |
| |
| # # # # wrapper for normalize |
| m_normalize = function(Matrix[Double] X) |
| return (Matrix[Double] Y, Matrix[Double] cmin, Matrix[Double] cmax) |
| { |
| # compute feature ranges for transformations |
| if(sum(is.na(X)) > 0) |
| [cmin, cmax] = colMinMax(X); |
| else { |
| cmin = colMins(X); |
| cmax = colMaxs(X); |
| } |
| Y = normalizeApply(X, cmin, cmax); |
| } |
| |
| # # # get column min by removing NaN rows |
| colMinMax = function(Matrix[Double] X) |
| return (Matrix[Double] cmin, Matrix[Double] cmax) |
| { |
| cmin = matrix(0, rows=1, cols=ncol(X)) |
| cmax = matrix(0, rows=1, cols=ncol(X)) |
| for(i in 1:ncol(X)) { |
| vec = removeEmpty(target=X[, i], margin="rows", select = (is.na(X[, i]) == 0)) |
| cmin[1, i] = min(vec) |
| cmax[1, i] = max(vec) |
| } |
| } |