blob: b9f660a30d270bec2a92006015ad2ae0503cc4fb [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
# This script will read the dirty and clean data, then it will apply the best pipeline on dirty data
# and then will classify both cleaned dataset and check if the cleaned dataset is performing same as original dataset
# in terms of classification accuracy
#
# INPUT:
# --------------------------------------------------------------------------------
# trainData ---
# testData ---
# metaData ---
# lp ---
# pip ---
# hp ---
# evaluationFunc ---
# evalFunHp ---
# isLastLabel ---
# correctTypos ---
# --------------------------------------------------------------------------------
#
# OUTPUT:
# -----------------------------------------------------------------------------------------------
# scores ---
# -----------------------------------------------------------------------------------------------
source("scripts/builtin/topk_cleaning.dml") as topk;
s_apply_pipeline = function(Frame[Unknown] testData, Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] pip,
Frame[Unknown] applyFunc, Matrix[Double] hp, Boolean isLastLabel = TRUE,List[Unknown] exState, List[Unknown] iState, Boolean correctTypos=FALSE)
return (Matrix[Double] eXtest)
{
no_of_flag_vars = 5
[schema, mask, fdMask, maskY] = topk::prepareMeta(testData, metaData)
pip = removeEmpty(target=pip, margin="cols")
applyFunc = removeEmpty(target=applyFunc, margin="cols")
metaList = list(mask=mask, schema=schema, fd=fdMask, applyFunc=as.frame("NULL"))
ctx = list(prefix="----"); #TODO include seed
# separate the label
[Xtest, Ytest] = topk::getLabel(testData, isLastLabel)
# always recode the label
if(maskY == 1) {
M = as.frame(exState[1])
eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
}
else
{
eYtest = as.matrix(Ytest)
}
# # # when the evaluation function is called first we also compute and keep hyperparams of target application
ctx = list(prefix="apply Pipeline")
[Xtest, Xt] = topk::runStringPipeline(Xtest, Xtest, schema, mask, FALSE, correctTypos, ctx)
# # # if mask has 1s then there are categorical features
M = as.frame(exState[2])
if(sum(mask) > 0)
{
index = vectorToCsv(mask)
jspecR = "{ids:true, recode:["+index+"]}"
eXtest = transformapply(target=Xtest, spec=jspecR, meta=M);
}
else
eXtest = as.matrix(Xtest)
metaList["applyFunc"] = applyFunc
no_of_param = as.scalar(hp[1, 1]) + 1
hp_width= hp[1, 2:no_of_param]
hp_matrix = matrix(hp_width, rows=ncol(pip), cols=ncol(hp_width)/ncol(pip))
pipList = list(ph = pip, hp = hp_matrix, flags = no_of_flag_vars)
for(i in 1:length(iState)) {
op = as.scalar(pip[1,i])
XtestClone = eXtest
applyOp = toString(as.scalar(applyFunc[1,i]))
dataFlag = as.scalar(hp_matrix[i, ncol(hp_matrix)])
[iState, L] = remove(iState, 1)
[eXtest, executeFlag] = getDataFromFlag(eXtest, mask, dataFlag)
L2 = list(eXtest)
L = as.list(L)
for(k in 1:length(L)) {
L2 = append(L2, L[k])
}
if(executeFlag == 1 & applyOp != "NA") {
eXtest = eval(applyOp, L2);
eXtest = confirmDataFromMask (eXtest, XtestClone, mask, dataFlag)
eXtest = confirmMetaFromMask (eXtest, mask)
}
else {
print("not applying "+op+" executeFlag = 0")
}
}
}
getDataFromFlag = function(Matrix[Double] X, Matrix[Double] mask, Integer dataFlag)
return(Matrix[Double] X,Integer executeFlag)
{
executeFlag = 1
if(dataFlag == 0)
{
if(sum(mask) == ncol(mask))
executeFlag = 0
else {
# take numerics out and remove categorical
X = removeEmpty(target=X, margin = "cols", select = (mask == 0))
}
}
else if(dataFlag == 1)
{
if(sum(mask) == 0)
executeFlag = 0
else {
# take categorical out and remove numerics
X = removeEmpty(target=X, margin = "cols", select = mask)
}
}
else X = X
}
confirmMetaFromMask = function(Matrix[Double] X, Matrix[Double] mask)
return (Matrix[Double] X)
{
if((sum(mask) > 0) & (ncol(X) == ncol(mask)))
{
# get the max + 1 for nan replacement
nanMask = is.na(X)
# replace nan
X = replace(target = X, pattern = NaN, replacement = 9999)
# take categorical out
cat = removeEmpty(target=X, margin="cols", select = mask)
# round categorical (if there is any floating point)
cat = round(cat)
less_than_1_mask = cat < 1
less_than_1 = less_than_1_mask * 9999
cat = (cat * (less_than_1_mask == 0)) + less_than_1
# reconstruct original X
X = X * (mask == 0)
q = table(seq(1, ncol(cat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(cat), ncol(X))
X = (cat %*% q) + X
# put nan back
nanMask = replace(target = nanMask, pattern = 1, replacement = NaN)
X = X + nanMask
}
}
confirmDataFromMask = function(Matrix[Double] nX, Matrix[Double] originalX, Matrix[Double] mask, Integer dataFlag)
return (Matrix[Double] X)
{
if(dataFlag == 0 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without numerics
Xcat = removeEmpty(target=originalX, margin="cols", select=mask)
nanMask = is.na(Xcat)
Xcat = replace(target = Xcat, pattern = NaN, replacement = -1111)
# reconstruct the original matrix
p = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(nX), ncol(originalX))
q = table(seq(1, ncol(Xcat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(Xcat), ncol(originalX))
X = (nX %*% p) + (Xcat %*% q)
X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)
}
else if(dataFlag == 1 & (sum(mask) > 0) & (sum(mask) != ncol(originalX)))
{
maxDummy = max(replace(target=nX, pattern=NaN, replacement=0)) + 1
nX = replace(target = nX, pattern = NaN, replacement = maxDummy)
# X without categorical
Xnum = removeEmpty(target=originalX, margin="cols", select=(mask==0))
nanMask = is.na(Xnum)
Xnum = replace(target = Xnum, pattern = NaN, replacement = -1111)
# reconstruct the original matrix
p = table(seq(1, ncol(Xnum)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask==0)), ncol(Xnum), ncol(originalX))
q = table(seq(1, ncol(nX)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(nX), ncol(originalX))
X = (nX %*% q) + (Xnum %*% p)
X = replace(target = X, pattern = maxDummy, replacement = NaN)
X = replace(target = X, pattern = -1111, replacement = NaN)
}
else X = nX
}