blob: b42a49bd0e519618461618aec1b4786adc4016b8 [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
# This function execute pipeline.
#
# INPUT:
# ----------------------------------------------------------------------------------------
# logical ---
# pipeline ---
# X ---
# Y ---
# Xtest ---
# Ytest ---
# metaList ---
# hyperParameters ---
# hpForPruning ---
# changesByOp ---
# flagsCount ---
# test ---
# verbose ---
# ----------------------------------------------------------------------------------------
#
# OUTPUT:
# --------------------------------------------------------------------------------------
# X ---
# Y ---
# Xtest ---
# Ytest ---
# t2 ---
# hpForPruning ---
# changesByOp ---
# --------------------------------------------------------------------------------------
s_executePipeline = function(Frame[String] pipeline, Matrix[Double] Xtrain, Matrix[Double] Ytrain,
Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, Matrix[Double] hyperParameters, Matrix[Double] hpForPruning = as.matrix(0),
Matrix[Double] changesByOp = as.matrix(0), Integer flagsCount, Boolean test = FALSE, Boolean verbose)
return (Matrix[Double] Xtrain, Matrix[Double] Ytrain, Matrix[Double] Xtest, Matrix[Double] Ytest,
Double t2, Matrix[Double] hpForPruning, Matrix[Double] changesByOp, Double changesAll, List[Unknown] internalStates)
{
internalStates = list()
mask=as.matrix(metaList['mask'])
applyFunc = as.frame(metaList['applyFunc'])
changesAll = 0.0
d = ncol(Xtrain)
testRow = nrow(Xtest)
Xorig = Xtest
t1 = time()
if(verbose) {
print("checks rows in X = "+nrow(Xtrain)+" rows in Y = "+nrow(Ytrain)+" cols in X = "+ncol(Xtrain)+" col in Y = "+ncol(Ytrain))
print("pipeline in execution "+toString(pipeline))
print("pipeline hps "+toString(hyperParameters))
}
for(i in 1:ncol(pipeline)) {
op = as.scalar(pipeline[1,i])
applyOp = toString(as.scalar(applyFunc[1,i]))
Xclone = Xtrain
XtestClone = Xtest
[hp, dataFlag, yFlag, executeFlag] = matrixToList(Xtrain, Ytrain, mask, as.matrix(metaList['fd']), hyperParameters[i,], flagsCount, op)
if(executeFlag == 1) {
L = evalList(op, hp)
[L, O] = remove(L, 1);
Xtrain = as.matrix(O)
if(applyOp != "NA") {
[Xtest, executeFlag] = applyDataFlag(Xtest, mask, dataFlag)
internalStates = append(internalStates, L)
L = append(L, list(X=Xtest));
Xtest = eval(applyOp, L);
Xtest = confirmData(Xtest, XtestClone, mask, dataFlag)
}
else {
internalStates = append(internalStates, as.frame("NA"))
}
Xtrain = confirmData(Xtrain, Xclone, mask, dataFlag)
# dataFlag 0 = only on numeric, 1 = on whole data
if(yFlag) {
[L, Y] = remove(L, 1);
Ytrain = as.matrix(Y)
}
# Xtrain = confirmMeta(Xtrain, mask)
# Xtest = confirmMeta(Xtest, mask)
}
else {
print("not applying operation executeFlag = 0")
}
if(ncol(Xtest) == d & nrow(Xtest) == nrow(XtestClone) & ncol(hpForPruning) > 1) {
changesSingle = sum(abs(replace(target=Xtest, pattern=NaN, replacement=0) - replace(target=XtestClone, pattern=NaN, replacement=0)) > 0.001 )
changesAll = sum(abs(replace(target=Xtest, pattern=NaN, replacement=0) - replace(target=Xorig, pattern=NaN, replacement=0)) > 0.001 )
if(as.scalar(pipeline[1, i]) == "outlierBySd" | as.scalar(pipeline[1, i]) == "outlierByIQR" | as.scalar(pipeline[1, i]) == "imputeByFd") {
hpForPruning[1, i] = hyperParameters[i, 2]
changesByOp[1, i] = changesSingle
}
}
}
t2 = floor((time() - t1) / 1e+6)
}
# This function will convert the matrix row-vector into list
matrixToList = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Matrix[Double] FD,
Matrix[Double] p, Integer flagsCount, String op)
return (List[Unknown] l, Integer dataFlag, Integer yFlag, Integer executeFlag)
{
NUM_META_FLAGS = flagsCount;
dataFlag = as.integer(as.scalar(p[1, ncol(p)]))
hasVerbose = as.integer(as.scalar(p[1, ncol(p) - 1]))
yFlag = as.integer(as.scalar(p[1, ncol(p) - 2]))
fDFlag = as.integer(as.scalar(p[1, ncol(p)-3]))
maskFlag = as.integer(as.scalar(p[1, ncol(p)-4]))
######################################################
# CHECK FOR DATA FLAG
[X, executeFlag] = applyDataFlag(X, mask, dataFlag)
l = list(X)
######################################################
# CHECK FOR Y APPEND FLAG
if(yFlag == 1) {
l = append(l, Y)
}
######################################################
# CHECK FOR FD APPEND FLAG
if(fDFlag == 1) {
l = append(l, FD)
}
######################################################
# CHECK FOR MASK APPEND FLAG
if(maskFlag == 1) {
l = append(l, mask)
}
#####################################################
# POPULATE HYPER PARAM
# get the number of hyper-parameters and loop till that
no_of_hyperparam = as.scalar(p[1,1])
if(no_of_hyperparam > 0) {
for(i in 1:no_of_hyperparam)
l = append(l, as.scalar(p[1,(i+1)]))
}
######################################################
# CHECK FOR VERBOSE FLAG
if(hasVerbose == 1)
l = append(l, FALSE)
}
applyDataFlag = function(Matrix[Double] X, Matrix[Double] mask, Integer dataFlag)
return(Matrix[Double] X,Integer executeFlag)
{
executeFlag = 1
if(dataFlag == 0)
{
if(sum(mask) == ncol(mask))
executeFlag = 0
else {
# take numerics out and remove categorical
X = removeEmpty(target=X, margin = "cols", select = (mask == 0))
}
}
else if(dataFlag == 1)
{
if(sum(mask) == 0)
executeFlag = 0
else if(sum(mask) != ncol(mask)) {
# take categorical out and remove numerics
X = removeEmpty(target=X, margin = "cols", select = mask)
}
else X = X
}
else X = X
}
confirmData = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag)
return (Matrix[Double] X)
{
if(dataFlag == 0 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without numerics
Xcat = removeEmpty(target=originalX, margin="cols", select=mask)
nanMask = is.na(Xcat)
Xcat = replace(target = Xcat, pattern = NaN, replacement = -1111)
# reconstruct the original matrix
p = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(nX), ncol(originalX))
q = table(seq(1, ncol(Xcat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(Xcat), ncol(originalX))
X = (nX %*% p) + (Xcat %*% q)
X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)
}
else if(dataFlag == 1 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without categorical
Xnum = removeEmpty(target=originalX, margin="cols", select=(mask==0))
nanMask = is.na(Xnum)
Xnum = replace(target = Xnum, pattern = NaN, replacement = -1111)
# reconstruct the original matrix
p = table(seq(1, ncol(Xnum)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(Xnum), ncol(originalX))
q = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(nX), ncol(originalX))
X = (nX %*% q) + (Xnum %*% p)
X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)
}
else X = nX
}
#######################################################################
# Wrapper of transformencode OHE call, to call inside eval as a function
# Inputs: The input dataset X, and mask of the columns
# Output: OHEd matrix X
#######################################################################
dummycoding = function(Matrix[Double] X, Matrix[Double] mask)
return (Matrix[Double] X, String jspec, Frame[Unknown] meta) {
meta = as.frame("NULL")
jspec = ""
if(sum(mask) > 0)
{
X = replace(target=X, pattern=NaN, replacement=0)
idx = vectorToCsv(mask)
# specifications for one-hot encoding of categorical features
jspec = "{ids:true, dummycode:["+idx+"]}";
# OHE of categorical features
[X, meta] = transformencode(target=as.frame(X), spec=jspec);
}
}
dummycodingApply = function(Matrix[Double] X, String jspec, Frame[Unknown] meta)
return (Matrix[Double] Y) {
if(jspec != "")
{
Y = transformapply(target=as.frame(X), spec=jspec, meta=meta);
}
else Y = X
}
#######################################################################
# Wrapper of imputeByFD OHE call, to call inside eval as a function
# Inputs: The input dataset X, and mask of the columns and threshold value
# Output: filled matrix X
#######################################################################
imputeByFd = function(Matrix[Double] X, Matrix[Double] fdMask, Double threshold)
return (Matrix[Double] X, Matrix[Double] fillMatrix)
{
fillMatrix = as.matrix(0)
if(sum(fdMask) > 0)
{
t = replace(target=X, pattern=NaN, replacement=1)
fdMask = removeEmpty(target=fdMask, margin="cols")
FD = discoverFD(X=t, Mask=fdMask, threshold=threshold)
FD = (diag(matrix(1, rows=nrow(FD), cols=1)) ==0) * FD
FD = FD > 0
fillMatrix = matrix(0, rows=ncol(FD) * ncol(FD), cols=max(t))
if(sum(FD) > 0)
{
for(i in 1: nrow(FD)) {
for(j in 1:ncol(FD)) {
if(as.scalar(FD[i, j]) > 0 & (min(X[, i]) != 0) & (min(X[, j]) != 0) & (sum(FD[, j]) != nrow(FD))
& (as.scalar(fdMask[1, j]) != 0) & (as.scalar(fdMask[1, i]) != 0)) {
[t, imp] = imputeByFD(X[,i], X[,j], threshold, FALSE)
X[, j] = t
fillMatrix[ncol(FD) * (i - 1) + j, 1:nrow(imp)] = t(imp)
}
}
}
}
}
}
imputeByFdApply = function(Matrix[Double] X, Matrix[Double] fillMatrix)
return (Matrix[Double] X)
{
d = sqrt(nrow(fillMatrix))
for(i in 1: d)
{
for(j in 1:d) {
idx = d * (i - 1) + j
if(sum(fillMatrix[idx,]) > 0) {
imp = fillMatrix[idx, ]
imp = removeEmpty(target=t(imp), margin="rows")
X[, j] = imputeByFDApply(X[, i], imp)
}
}
}
}
#######################################################################
# Wrapper of na_lof to call inside eval as a function
# Output: filled matrix X
#######################################################################
forward_fill = function(Matrix[Double] X, Boolean op, Boolean verbose)
return (Matrix[Double] X_filled, Boolean op, Boolean verbose)
{
option = ifelse(op, "locf", "nocb")
X_filled = na_locf(X=X, option=option, verbose=verbose)
}
# smote wrapper for doing relative over-sampling
SMOTE = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Integer remainingRatio, Boolean verbose)
return (Matrix[Double] X, Matrix[Double] Y)
{
# get the class count
classes = table(Y, 1)
for(k in 1:nrow(classes) - 1) {
minClass = min(classes)
maxClass = max(classes)
diff = (maxClass - minClass)/sum(classes)
if(diff > 0.2 & max(Y) <=2)
{
XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE)
synthesized = matrix(0,0,0) # initialize variable
start_class = 1
end_class = 0
kmat = table(XY[, 1], 1)
getMax = max(kmat)
maxKIndex = as.scalar(rowIndexMax(t(kmat)))
outSet = matrix(0, 0, ncol(XY))
remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - (remainingRatio%%100)),
remainingRatio-(remainingRatio%%100))
for(i in 1: nrow(kmat), check=0) {
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
if((i != maxKIndex) & (nrow(class_t) > 1)) {
synthesized = smote(class_t[, 2:ncol(XY)], mask, remainingRatio, 1, FALSE)
synthesized = cbind(matrix(as.scalar(class_t[2,1]), nrow(synthesized), 1), synthesized)
outSet = rbind(outSet, synthesized)
}
start_class = end_class + 1
}
XY = rbind(XY, synthesized)
Y = XY[, 1]
X = XY[, 2:ncol(XY)]
}
}
}
########################################################
# The function will replace the null with default values
########################################################
fillDefault = function(Matrix[Double] X)
return(Matrix[Double] X, Matrix[Double] defaullt){
Mask = is.na(X)
X = replace(target=X, pattern=NaN, replacement=0)
cmax = colMaxs(X)
cmin = colMins(X)
defaullt = round(cmax - cmin)
Mask = Mask * defaullt
X = X + Mask
}
########################################################
# The function will replace the null with default values
########################################################
fillDefaultApply = function(Matrix[Double] X, Matrix[Double] defaullt)
return(Matrix[Double] X){
Mask = is.na(X)
X = replace(target=X, pattern=NaN, replacement=0)
Mask = Mask * defaullt
X = X + Mask
}
storeDataForPrunning = function(Frame[Unknown] pipeline, Matrix[Double] hp, Matrix[Double] hpForPruning,
Matrix[Double] changesByOp, Integer changes, Integer i)
return(Matrix[Double] hpForPruning, Matrix[Double] changesByOp)
{
if(ncol(hpForPruning) > 1) {
hpForPruning[1, i] = hp[i, 2]
changesByOp[1, i] = changes
}
}
########################################################
# The function will flip the noisy labels
########################################################
flipLabels = function(Matrix[Double] X, Matrix[Double] Y, Double threshold, Integer maxIter =10, Boolean verbose = FALSE)
return (Matrix[Double] X, Matrix[Double] Y)
{
classes1 = table(Y, 1)
if(min(Y) != max(Y) & nrow(Y) > 1 & max(Y) <= 2)
{
betas = multiLogReg(X=X, Y=Y, icpt=1, reg=1e-4, maxi=100, maxii=0, verbose=FALSE)
[prob, yhat, accuracy] = multiLogRegPredict(X, betas, Y, FALSE)
inc = ((yhat != Y) & (rowMaxs(prob) > threshold))
while(sum(inc) > 0 & maxIter > 0 & min(Y) != max(Y) & nrow(Y) > 1)
{
Xcor = removeEmpty(target = X, margin = "rows", select = (inc==0))
Ycor = removeEmpty(target = Y, margin = "rows", select = (inc==0))
Xinc = removeEmpty(target = X, margin = "rows", select = inc)
Yinc = removeEmpty(target = Y, margin = "rows", select = inc)
yhat = removeEmpty(target = yhat, margin = "rows", select = inc)
prob = removeEmpty(target = prob, margin = "rows", select = inc)
inc = removeEmpty(target = inc, margin = "rows", select = inc)
# # # replace with second best option
replaced = yhat
Yinc = yhat
X = rbind(Xcor, Xinc)
Y = rbind(Ycor, Yinc)
maxIter = maxIter - 1
if(min(Y) != max(Y) & nrow(Y) > 1) {
betas = multiLogReg(X=X, Y=Y, icpt=1, reg=1e-4, maxi=100, maxii=0, verbose=FALSE)
[prob, yhat, accuracy] = multiLogRegPredict(X, betas, Y, FALSE)
inc = ((yhat != Y) & (rowMaxs(prob) > threshold))
}
}
}
classes = table(Y, 1)
}
# # # # wrapper for normalize
m_normalize = function(Matrix[Double] X)
return (Matrix[Double] Y, Matrix[Double] cmin, Matrix[Double] cmax)
{
# compute feature ranges for transformations
if(sum(is.na(X)) > 0)
[cmin, cmax] = colMinMax(X);
else {
cmin = colMins(X);
cmax = colMaxs(X);
}
Y = normalizeApply(X, cmin, cmax);
}
# # # get column min by removing NaN rows
colMinMax = function(Matrix[Double] X)
return (Matrix[Double] cmin, Matrix[Double] cmax)
{
cmin = matrix(0, rows=1, cols=ncol(X))
cmax = matrix(0, rows=1, cols=ncol(X))
for(i in 1:ncol(X)) {
vec = removeEmpty(target=X[, i], margin="rows", select = (is.na(X[, i]) == 0))
cmin[1, i] = min(vec)
cmax[1, i] = max(vec)
}
}