blob: 7d466cb155f28dbbd293552f7180a1a64e247e7c [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[String] pipeline, Matrix[Double] X, Matrix[Double] Y,
Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, Matrix[Double] hyperParameters, Matrix[Double] hpForPruning = as.matrix(0),
Matrix[Double] changesByOp = as.matrix(0), Integer flagsCount, Boolean test = FALSE, Boolean verbose)
return (Matrix[Double] X, Matrix[Double] Y, Matrix[Double] Xtest, Matrix[Double] Ytest, Double t2, Matrix[Double] hpForPruning, Matrix[Double] changesByOp)
{
mask=as.matrix(metaList['mask'])
FD = as.matrix(metaList['fd'])
cloneY = Y
Xorig = X
# # combine X and Y
n = nrow(X)
d = ncol(Xorig)
X = rbind(X, Xtest)
Y = rbind(Y, Ytest)
testRow = nrow(Xtest)
Xout = X
t1 = time()
#print("PIPELINE EXECUTION START ... "+toString(pipeline))
if(verbose) {
print("checks rows in X = "+nrow(X)+" rows in Y = "+nrow(Y)+" cols in X = "+ncol(X)+" col in Y = "+ncol(Y))
print("pipeline in execution "+toString(pipeline))
print("pipeline hps "+toString(hyperParameters))
}
for(i in 1:ncol(pipeline)) {
trainEndIdx = (nrow(X) - nrow(Xtest))
testStIdx = trainEndIdx + 1
op = as.scalar(pipeline[1,i])
lgOp = as.scalar(logical[1,i])
if(test == FALSE | lgOp != "CI") {
Xclone = X
[hp, dataFlag, yFlag, executeFlag] = matrixToList(X, Y, mask, FD, hyperParameters[i], flagsCount, op)
if(executeFlag == 1) {
X = eval(op, hp)
Xout = X
X = confirmData(X, Xclone, mask, dataFlag, yFlag)
# dataFlag 0 = only on numeric, 1 = on whole data
if(yFlag)
{
Y = X[, ncol(X)]
X = X[, 1:ncol(X) - 1]
}
X = confirmMeta(X, mask)
}
else {
print("not applying "+op+" executeFlag = 0")
}
}
else {
Xclone = X
#print("not applying "+lgOp+" "+op+" on data test flag: "+test)
Xtest = X[testStIdx:nrow(X), ]
Ytest = Y[testStIdx:nrow(X), ]
X = X[1:trainEndIdx, ]
Y = Y[1:trainEndIdx, ]
[hp, dataFlag, yFlag, executeFlag] = matrixToList(X, Y, mask, FD, hyperParameters[i], flagsCount, op)
if(executeFlag == 1)
{
X = eval(op, hp)
X = confirmData(X, Xclone, mask, dataFlag, yFlag)
# dataFlag 0 = only on numeric, 1 = on whole data
if(yFlag)
{
Y = X[, ncol(X)]
X = X[, 1:ncol(X) - 1]
}
X = confirmMeta(X, mask)
X = rbind(X, Xtest)
Y = rbind(Y, Ytest)
}
else {
print("not applying "+op+" executeFlag = 0")
}
}
if(as.scalar(pipeline[1, i]) == "outlierBySd" | as.scalar(pipeline[1, i]) == "outlierByIQR" | as.scalar(pipeline[1, i]) == "imputeByFd") {
changes = sum(abs(replace(target=Xout, pattern=NaN, replacement=0) - replace(target=as.matrix(hp[1]), pattern=NaN, replacement=0)) > 0.001 )
[hpForPruning, changesByOp] = storeDataForPrunning(pipeline, hyperParameters, hpForPruning, changesByOp, changes, i)
}
}
Xtest = X[testStIdx:nrow(X), ]
Ytest = Y[testStIdx:nrow(X), ]
X = X[1:trainEndIdx]
Y = Y[1:trainEndIdx]
# # # do a quick validation check
if(nrow(Xtest) != testRow)
stop("executePipeline: test rows altered")
t2 = floor((time() - t1) / 1e+6)
#print("PIPELINE EXECUTION ENDED: "+t2+" ms")
}
# This function will convert the matrix row-vector into list
matrixToList = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Matrix[Double] FD,
Matrix[Double] p, Integer flagsCount, String op)
return (List[Unknown] l, Integer dataFlag, Integer yFlag, Integer executeFlag)
{
NUM_META_FLAGS = flagsCount;
dataFlag = as.integer(as.scalar(p[1, ncol(p)]))
hasVerbose = as.integer(as.scalar(p[1, ncol(p) - 1]))
yFlag = as.integer(as.scalar(p[1, ncol(p) - 2]))
fDFlag = as.integer(as.scalar(p[1, ncol(p)-3]))
maskFlag = as.integer(as.scalar(p[1, ncol(p)-4]))
executeFlag = 1
######################################################
# CHECK FOR DATA FLAG
if(dataFlag == 0)
{
if(sum(mask) == ncol(mask))
executeFlag = 0
else {
# take numerics out and remove categorical
X = removeEmpty(target=X, margin = "cols", select = (mask == 0))
}
}
else if(dataFlag == 1)
{
if(sum(mask) == 0)
executeFlag = 0
else {
# take categorical out and remove numerics
X = removeEmpty(target=X, margin = "cols", select = mask)
}
}
l = list(X)
######################################################
# CHECK FOR Y APPEND FLAG
if(yFlag == 1) {
l = append(l, Y)
}
######################################################
# CHECK FOR FD APPEND FLAG
if(fDFlag == 1)
{
l = append(l, FD)
}
######################################################
# CHECK FOR MASK APPEND FLAG
if(maskFlag == 1)
{
l = append(l, mask)
}
#####################################################
# POPULATE HYPER PARAM
# get the number of hyper-parameters and loop till that
no_of_hyperparam = as.scalar(p[1,1])
if(no_of_hyperparam > 0) {
for(i in 1:no_of_hyperparam)
l = append(l, as.scalar(p[1,(i+1)]))
}
######################################################
# CHECK FOR VERBOSE FLAG
if(hasVerbose == 1)
l = append(l, FALSE)
}
confirmMeta = function(Matrix[Double] X, Matrix[Double] mask)
return (Matrix[Double] X)
{
if((sum(mask) > 0) & (ncol(X) == ncol(mask)))
{
# get the max + 1 for nan replacement
nanMask = is.na(X)
# replace nan
X = replace(target = X, pattern = NaN, replacement = 9999)
# take categorical out
cat = removeEmpty(target=X, margin="cols", select = mask)
# round categorical (if there is any floating point)
cat = round(cat)
# reconstruct original X
X = X * (mask == 0)
q = table(seq(1, ncol(cat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(cat), ncol(X))
X = (cat %*% q) + X
# put nan back
nanMask = replace(target = nanMask, pattern = 1, replacement = NaN)
X = X + nanMask
# print("X less than equal to zero "+sum(cat <= 0))
}
}
confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag, Integer yFlag)
return (Matrix[Double] X)
{
if(yFlag == 1)
{
Y = nX[, ncol(nX)]
nX = nX[, 1: ncol(nX) - 1]
}
if(dataFlag == 0 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(nX) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without numerics
Xcat = removeEmpty(target=originalX, margin="cols", select=mask)
nanMask = is.na(Xcat)
Xcat = replace(target = Xcat, pattern = NaN, replacement = -1111)
# print("unchanged data \n"+toString(originalX, rows=10))
# reconstruct the original matrix
p = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(nX), ncol(originalX))
q = table(seq(1, ncol(Xcat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(Xcat), ncol(originalX))
X = (nX %*% p) + (Xcat %*% q)
X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)
}
else if(dataFlag == 1 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(nX) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without categorical
Xnum = removeEmpty(target=originalX, margin="cols", select=(mask==0))
nanMask = is.na(Xnum)
Xnum = replace(target = Xnum, pattern = NaN, replacement = -1111)
# reconstruct the original matrix
p = table(seq(1, ncol(Xnum)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(Xnum), ncol(originalX))
q = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(nX), ncol(originalX))
X = (nX %*% q) + (Xnum %*% p)
X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)
}
else X = nX
# print("recreated data \n"+toString(X, rows = 20))
if(yFlag == 1)
X = cbind(X, Y)
}
#######################################################################
# Wrapper of transformencode OHE call, to call inside eval as a function
# Inputs: The input dataset X, and mask of the columns
# Output: OHEd matrix X
#######################################################################
dummycoding = function(Matrix[Double] X, Matrix[Double] mask)
return (Matrix[Double] dX_train) {
if(sum(mask) > 0)
{
X = replace(target=X, pattern=NaN, replacement=1)
idx = vectorToCsv(mask)
# specifications for one-hot encoding of categorical features
jspecDC = "{ids:true, dummycode:["+idx+"]}";
# OHE of categorical features
[dX_train, dM] = transformencode(target=as.frame(X), spec=jspecDC);
}
else dX_train = X
}
#######################################################################
# Wrapper of imputeByFD OHE call, to call inside eval as a function
# Inputs: The input dataset X, and mask of the columns and threshold value
# Output: filled matrix X
#######################################################################
imputeByFd = function(Matrix[Double] X, Matrix[Double] fdMask, Double threshold)
return (Matrix[Double] X_filled)
{
if(sum(fdMask) > 0)
{
fdMask = removeEmpty(target=fdMask, margin="cols")
FD = discoverFD(X=replace(target=X, pattern=NaN, replacement=1), Mask=fdMask, threshold=threshold)
FD = (diag(matrix(1, rows=nrow(FD), cols=1)) ==0) * FD
FD = FD > 0
if(sum(FD) > 0)
{
for(i in 1: nrow(FD))
{
for(j in 1:ncol(FD)) {
if(as.scalar(FD[i, j]) > 0 & (min(X[, i]) != 0) & (min(X[, j]) != 0) & (sum(FD[, j]) != nrow(FD))
& (as.scalar(fdMask[1, j]) != 0) & (as.scalar(fdMask[1, i]) != 0))
X = imputeByFD(X, i, j, threshold, FALSE)
}
}
}
}
X_filled = X
}
#######################################################################
# Wrapper of na_lof to call inside eval as a function
# Output: filled matrix X
#######################################################################
forward_fill = function(Matrix[Double] X, Boolean op, Boolean verbose)
return (Matrix[Double] X_filled)
{
option = ifelse(op, "locf", "nocb")
X_filled = na_locf(X=X, option=option, verbose=verbose)
}
# smote wrapper for doing relative over-sampling
SMOTE = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Integer remainingRatio, Boolean verbose)
return (Matrix[Double] XY)
{
# get the class count
classes = table(Y[, 1], 1)
minClass = min(classes)
maxClass = max(classes)
diff = (maxClass - minClass)/sum(classes)
if(diff > 0.5)
{
#print("initiating oversampling")
XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE)
synthesized = matrix(0,0,0) # initialize variable
start_class = 1
end_class = 0
k = table(XY[, 1], 1)
getMax = max(k)
maxKIndex = as.scalar(rowIndexMax(t(k)))
outSet = matrix(0, 0, ncol(XY))
remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - (remainingRatio%%100)),
remainingRatio-(remainingRatio%%100))
#print("remaining ratio: "+remainingRatio)
for(i in 1: nrow(k), check=0) {
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
if((i != maxKIndex)) {
synthesized = smote(class_t[, 2:ncol(XY)], mask, remainingRatio, 1, FALSE)
synthesized = cbind(matrix(as.scalar(class_t[2,1]), nrow(synthesized), 1), synthesized)
outSet = rbind(outSet, synthesized)
}
start_class = end_class + 1
}
XY = rbind(XY, synthesized)
Y = XY[, 1]
X = XY[, 2:ncol(XY)]
XY = cbind(X,Y)
classes = table(Y, 1)
}
else {
#print("smote not applicable")
XY = cbind(X, Y)
}
}
########################################################
# The function will replace the null with default values
########################################################
fillDefault = function(Matrix[Double] X)
return(Matrix[Double] X){
defaullt = round(colMaxs(X) - colMins(X))
Mask = is.na(X)
X = replace(target=X, pattern=NaN, replacement=0)
Mask = Mask * defaullt
X = X + Mask
# print("fillDefault: no of NaNs "+sum(is.na(X)))
}
########################################################
# A slightly changes version of PCA
########################################################
m_pca = function(Matrix[Double] X, Integer K=2, Boolean center=TRUE, Boolean scale=TRUE)
return (Matrix[Double] Xout)
{
if(K < ncol(X) - 1) {
N = nrow(X);
D = ncol(X);
# perform z-scoring (centering and scaling)
[X, Centering, ScaleFactor] = scale(X, center, scale);
# co-variance matrix
mu = colSums(X)/N;
C = (t(X) %*% X)/(N-1) - (N/(N-1))*t(mu) %*% mu;
# compute eigen vectors and values
[evalues, evectors] = eigen(C);
if(nrow(evalues) > 1 & nrow(evectors) > 1)
{
decreasing_Idx = order(target=evalues,by=1,decreasing=TRUE,index.return=TRUE);
diagmat = table(seq(1,D),decreasing_Idx);
# sorts eigenvalues by decreasing order
evalues = diagmat %*% evalues;
# sorts eigenvectors column-wise in the order of decreasing eigenvalues
evectors = evectors %*% diagmat;
eval_dominant = evalues[1:K, 1];
evec_dominant = evectors[,1:K];
# Construct new data set by treating computed dominant eigenvectors as the basis vectors
Xout = X %*% evec_dominant;
Mout = evec_dominant;
}
else Xout = X # these elses could be removed via initiating Xout = X for now they are here for readability
}
else Xout = X
Xout = replace(target=Xout, pattern=1/0, replacement=0);
}
wtomeklink = function(Matrix[Double] X, Matrix[Double] y)
return (Matrix[Double] XY) {
[Xunder, Yunder, rmv] = tomeklink(X, y)
XY = cbind(Xunder, Yunder)
}
storeDataForPrunning = function(Frame[Unknown] pipeline, Matrix[Double] hp, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Integer changes, Integer i)
return(Matrix[Double] hpForPruning, Matrix[Double] changesByOp)
{
if(ncol(hpForPruning) > 1) {
hpForPruning[1, i] = hp[i, 2]
changesByOp[1, i] = changes
}
}