blob: 7d361ea5be977a0e8694bd633b9b505268ff9630 [file] [log] [blame]
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------
# metaData[3, ncol(X)] : metaData[1] stores mask, metaData[2] stores schema, metaData[3] stores FD mask
source("scripts/pipelines/scripts/utils.dml") as utils;
source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = as.frame("NULL"), Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] primitives,
Frame[Unknown] parameters, Matrix[Double] cmr = matrix("4 0.7 1", rows=1, cols=3), String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5,
Integer resource_val = 20, Double sample = 0.1, Boolean cv=TRUE, Integer cvk = 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, String output)
return(Boolean perf)
# return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, Matrix[Double] topKScores, Frame[Unknown] bestLogical,
# Frame[Unknown] features, Double dirtyScore, Matrix[Double] evalFunHp)
{
t1 = time(); print("TopK-Cleaning:");
Xtest = as.frame("0")
Ytest = as.frame("0")
ctx = list(prefix="----"); #TODO include seed
# prepare meta data
# # keeping the meta list format if we decide to add more stuff in metadata
[schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
metaList = list(mask=mask, schema=schema, fd=fdMask)
t2 = time(); print("-- Cleaning - Prepare Metadata: "+(t2-t1)/1e9+"s");
# separate the label
[Xtrain, Ytrain] = getLabel(dataTrain, isLastLabel)
if(!cv)
[Xtest, Ytest] = getLabel(dataTest, isLastLabel)
# always recode the label
if(maskY == 1) {
[eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, recode:[1]}");
eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
}
else {
eYtrain = as.matrix(Ytrain)
eYtest = as.matrix(Ytest)
}
t3 = time(); print("-- Cleaning - Prepare Labels: "+(t3-t2)/1e9+"s");
# # # when the evaluation function is called first we also compute and keep hyperparams of target application
print("-- Cleaning - Get Dirty Score: ");
[dirtyScore, evalFunHp] = getDirtyScore(X=Xtrain, Y=eYtrain, Xtest=Xtest, Ytest=eYtest, evaluationFunc=evaluationFunc,
metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk, ctx=ctx)
t4 = time(); print("---- finalized in: "+(t4-t3)/1e9+"s");
# # do the string processing
print("-- Cleaning - Data Preparation (strings, transform, sample): ");
[Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos, ctx)
# # if mask has 1s then there are categorical features
print("---- feature transformations to numeric matrix");
[eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, cv, "recode")
# apply sampling on training data for pipeline enumeration
# TODO why recoding/sampling twice (within getDirtyScore)
print("---- class-stratified sampling of feature matrix w/ f="+sample);
[eXtrain, eYtrain] = utils::doSample(eXtrain, eYtrain, sample, TRUE)
t5 = time(); print("---- finalized in: "+(t5-t4)/1e9+"s");
# # # create logical pipeline seeds
logicalSeedCI = frame([
"4", "ED", "MVI", "OTLR", "EC", "0", "0", "0", "0",
"4", "ED", "MVI", "CI", "DUMMY","0","0", "0", "0",
"4", "OTLR", "EC", "CI", "DUMMY", "0", "0","0", "0",
"6", "MVI", "OTLR", "ED", "EC", "CI", "DUMMY", "0", "0",
"4", "ED", "MVI", "CI", "DUMMY", "0", "0", "0", "0",
"4", "MVI", "SCALE", "CI", "DUMMY", "0", "0", "0", "0",
"4", "ED", "EC", "CI", "DUMMY", "0", "0", "0", "0",
"4", "MVI", "OTLR", "CI", "DUMMY", "0", "0", "0", "0",
"5", "MVI", "OTLR", "EC", "CI", "DUMMY", "0", "0", "0",
"7", "ED", "MVI", "OTLR", "EC", "SCALE", "CI", "DUMMY", "0"
], rows=10, cols=9)
logicalSeedNoCI = frame([
"4", "ED", "MVI", "OTLR", "EC", "0", "0",
"3", "ED", "MVI", "DUMMY", "0","0","0",
"3", "OTLR", "EC", "DUMMY", "0","0","0",
"5", "MVI", "OTLR", "ED", "EC", "DUMMY", "0",
"3", "ED", "MVI", "DUMMY", "0", "0", "0",
"3", "MVI", "SCALE", "DUMMY", "0", "0", "0",
"3", "ED", "EC", "DUMMY", "0", "0", "0",
"3", "MVI", "OTLR", "DUMMY", "0", "0", "0",
"4", "MVI", "OTLR", "EC", "DUMMY", "0", "0",
"6", "ED", "MVI", "OTLR", "EC", "SCALE", "DUMMY"
], rows=10, cols=7)
tab = table(eYtrain, 1)
dist = nrow(tab)
if(nrow(eYtrain) > 0 & dist < 10)
logical = logicalSeedCI
else
logical = logicalSeedNoCI
idx = as.integer(as.scalar(logical[1, 1])) + 1
category = logical[1, 2:idx]
print("-- Cleaning - Enum Logical Pipelines: ");
[bestLogical, score] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr,
cat=category, population=logical[2:nrow(logical)], max_iter=ceil(resource_val/topK), metaList = metaList,
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, primitives=primitives, param=parameters,
num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE, ctx=ctx)
t6 = time(); print("---- finalized in: "+(t6-t5)/1e9+"s");
topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL")
# # [topKPipelines, topKHyperParams, topKScores, features] =
perf = bandit(X_train=eXtrain, Y_train=eYtrain, X_test=eXtest, Y_test=eYtest, metaList=metaList,
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, primitives=primitives, param=parameters, baseLineScore=dirtyScore,
k=topK, R=resource_val, cv=cv, output=output, verbose=TRUE);
t7 = time(); print("-- Cleaning - Enum Physical Pipelines: "+(t7-t6)/1e9+"s");
}
prepareMeta = function(Frame[Unknown] data, Frame[Unknown] metaData)
return(Frame[String] schema, Matrix[Double] mask, Matrix[Double] fdMask, Integer maskY)
{
if(as.scalar(metaData[1, 1]) == "NULL")
{
print("creating meta data")
r1 = detectSchema(data)
r2 = matrix(0, rows=1, cols=ncol(data))
for(i in 1 : ncol(r1))
{
if(as.scalar(r1[1, i]) == "STRING" | as.scalar(r1[1, i]) == "BOOLEAN")
r2[1, i] = 1
}
schema = r1[, 1:ncol(r1) - 1]
mask = r2[, 1:ncol(r2) - 1]
fdMask = r2[, 1:ncol(r2) - 1]
maskY = as.integer(as.scalar(r2[,ncol(r2)]))
}
else {
schema = metaData[1, 1:ncol(metaData) - 1]
mask = as.matrix(metaData[2, 1:ncol(metaData) - 1])
fdMask = as.matrix(metaData[3, 1:ncol(metaData) - 1])
maskY = as.integer(as.scalar(metaData[2, ncol(metaData)]))
}
}
getLabel = function(Frame[Unknown] data, Boolean isLastLabel)
return(Frame[Unknown] X, Frame[Unknown] Y)
{
if(isLastLabel) {
X = data[, 1:ncol(data) - 1]
Y = data[, ncol(data)]
}
else
{
X = data
Y = as.frame("0")
}
}
runStringPipeline = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Frame[String] schema,
Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE, List[Unknown] ctx)
return(Frame[Unknown] Xtrain, Frame[Unknown] Xtest)
{
if(cv)
Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
else
{
# # # binding train and test to use same dictionary for both
XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
Xtrain = XAll[1:nrow(Xtrain),]
Xtest = XAll[nrow(Xtrain)+1:nrow(XAll),]
}
}
getDirtyScore = function(Frame[Unknown] X, Matrix[Double] Y, Frame[Unknown] Xtest, Matrix[Double] Ytest, String evaluationFunc, List[Unknown] metaList,
Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk, List[Unknown] ctx=list() )
return(Double dirtyScore, Matrix[Double] evalFunHp)
{
dschema = detectSchema(X)
dmask = matrix(0, rows=1, cols=ncol(dschema))
for(i in 1:ncol(dschema))
if(as.scalar(dschema[1, i]) == "STRING" | as.scalar(dschema[1, i]) == "BOOLEAN")
dmask[1, i] = 1
prefix = as.scalar(ctx["prefix"]);
mask = as.matrix(metaList['mask'])
mask = ifelse(sum(mask == dmask) < ncol(mask), dmask, mask)
[eXtrain, eXtest] = recodeData(X, Xtest, mask, cv, "recode")
eXtrain = replace(target=eXtrain, pattern=NaN, replacement = 0)
eXtest = replace(target=eXtest, pattern=NaN, replacement = 0)
dirtyScore = 100
print(prefix+" sample from train data and dummy code");
[eXtrain, Ytrain] = utils::doSample(eXtrain, Y, sample, TRUE)
[eXtrain, eXtest] = recodeData(as.frame(eXtrain), as.frame(eXtest), mask, cv, "dummycode")
pipList = list(lp = as.frame("NULL"), ph = as.frame("NULL"), hp = as.matrix(0), flags = 0)
print(prefix+" hyper-parameter tuning");
if(cv) {
score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp,
pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
}
else {
score = eval(evaluationFunc, list(X=eXtrain, Y=Ytrain, Xtest=eXtest, Ytest=Ytest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 1))
}
dirtyScore = as.scalar(score[1, 1])
evalFunHp = score[1, 2:ncol(score)]
}
recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Matrix[Double] mask, Boolean cv, String code)
return(Matrix[Double] eXtrain, Matrix[Double] eXtest)
{
if(sum(mask) > 0)
{
index = vectorToCsv(mask)
jspecR = "{ids:true, "+code+":["+index+"]}"
[eXtrain, X_meta] = transformencode(target=Xtrain, spec=jspecR);
if(!cv)
eXtest = transformapply(target=Xtest, spec=jspecR, meta=X_meta);
else eXtest = as.matrix(Xtest)
}
# if no categorical value exist then just cast the frame into matrix
else {
eXtrain = as.matrix(Xtrain)
eXtest = as.matrix(Xtest)
}
}