| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| X = read($1, data_type="frame", format="csv", header=TRUE, |
| naStrings= ["NA", "null"," ","NaN", "nan", "", "?"]); |
| meta = read($2, data_type="frame", format="csv", header=FALSE); |
| primitives = read($3, data_type = "frame", format="csv", header= TRUE) |
| param = read($4, data_type = "frame", format="csv", header= FALSE) |
| sample = 0.4 |
| k = 5 |
| R = 40 |
| |
| # accuracy flag |
| weightedAccuracy = TRUE |
| |
| # TODO do it automatically |
| # Generate logical Pipeline |
| logical = as.frame("OTLR") |
| logical = cbind(logical, as.frame("MVI")) |
| # logical = cbind(logical, as.frame("CI")) |
| # logical = cbind(logical, as.frame("NR")) |
| |
| [result, res] = startCleaning(X, logical, "classification", meta, primitives, param, k, |
| sample, weightedAccuracy, R, TRUE) |
| output = as.logical(as.scalar(res[1,1] < res[1,2])) |
| write(output, $5, format = "text") |
| |
| str = "meta info \n" |
| str = append(str, "pipeline length "+k) |
| str = append(str,"sample size of data "+sample) |
| |
| startCleaning = function(Frame[Unknown] F, Frame[Unknown] logical, String targetApplication = "classification", |
| Frame[Unknown] metaInfo, Frame[Unknown] primitives, Frame[Unknown] param, Integer k, Double sample, |
| Boolean isWeighted = TRUE, Integer R=50, Boolean verbose = FALSE) |
| return (Frame[Unknown] result, Matrix[Double] res) |
| { |
| |
| res = as.matrix(0) |
| result = as.frame(0) |
| if(nrow(metaInfo) < 3) |
| stop("incomplete meta info") |
| |
| # initialize variables |
| eX = as.matrix(0) |
| eY = as.matrix(0) |
| |
| getSchema = metaInfo[1, 2:ncol(metaInfo)] |
| getMask = as.matrix(metaInfo[2, 2:ncol(metaInfo)]) |
| |
| # validate schema |
| X = dropInvalidType(F, getSchema) |
| |
| if(sum(getMask) > 0 ) |
| { |
| idx = getMask * t(seq(1, ncol(X))) |
| idx = removeEmpty(target = idx, margin = "cols") |
| index = vectorToCsv(idx) |
| jspecR = "{ids:true, recode:["+index+"]}" |
| [eX, X_meta] = transformencode(target=X, spec=jspecR); |
| # change the schema |
| getSchema = map(getSchema, "x->x.replace(\"STRING\", \"INT64\")") |
| getSchema = map(getSchema, "x->x.replace(\"BOOLEAN\", \"INT64\")") |
| |
| } else { |
| eX = as.matrix(X) |
| } |
| |
| eY = eX[, ncol(X)] |
| eX = eX[, 1:ncol(X) - 1] |
| |
| if(min(eY) == 0) |
| eY = eY + 1 # add one to class labels so that the min label is greater then zero |
| |
| getMask = getMask[, 1:ncol(getMask) - 1] # strip the mask of class label |
| getSchema = getSchema[, 1:ncol(getSchema) - 1] # strip the mask of class label |
| |
| [eX, eY] = doSample(eX, eY, sample) |
| # get train test and validation set with balanced class distribution |
| [X_trainset, y_trainset, X_test, y_test] = getDataSplits(eX, eY, 0.6, FALSE) |
| [X_train, y_train, X_val, y_val] = getDataSplits(X_trainset, y_trainset, 0.7, FALSE) |
| |
| if(verbose) { |
| print("classes in each split ") |
| trc = table(y_train, 1) |
| print("class distribution in train \n"+toString(trc)) |
| tsc = table(y_test, 1) |
| print("class distribution in test \n"+toString(tsc)) |
| tvc = table(y_val, 1) |
| print("class distribution in validation \n"+toString(tvc)) |
| } |
| |
| # get test accuracy first |
| # classify without cleaning |
| train_tmp = replace(target = X_train, pattern = NaN, replacement=0) |
| test_tmp = replace(target = X_test, pattern = NaN, replacement=0) |
| |
| betas = multiLogReg(X=train_tmp, Y=y_train, icpt=2, tol=1e-9, reg=0.00001, maxi=50, |
| maxii=50, verbose=FALSE) |
| [d_prob, d_yhat, d_accuracy] = multiLogRegPredict(test_tmp, betas, y_test, FALSE) |
| d_accuracy = getAccuracy(y_test, d_yhat, isWeighted) |
| if(verbose) |
| print("dirty accuracy "+d_accuracy) |
| |
| pip = as.frame("") |
| hp = as.matrix(0) |
| acc = as.matrix(0) |
| |
| [pip, hp, acc] = bandit(X_train, y_train, X_val, y_val, getMask, getSchema, logical, primitives, |
| param, k, d_accuracy, isWeighted, R, TRUE); |
| |
| tmp_hp = cbind(matrix(NaN, nrow(hp), 1), hp) |
| result = cbind(pip, as.frame(tmp_hp)) |
| result = cbind(result, as.frame(acc)) |
| if(as.scalar((is.na(acc[1,1]))) == 1) { |
| stop("warning: no best pipeline found") |
| } |
| res = testBestPipeline(pip[1,], hp[1,], X_train, y_train, X_test, y_test, getMask, |
| getSchema, as.scalar(acc[1,1])) |
| |
| } |
| |
| testBestPipeline = function(Frame[Unknown] pip, Matrix[Double] hp, Matrix[Double] X_train, Matrix[Double] y_train, |
| Matrix[Double] X_test, Matrix[Double] y_test, Matrix[Double] cmask, Frame[Unknown] schema, Double valAcc) |
| return (Matrix[Double] result) { |
| ls = list(); |
| i = 1 |
| k = 1 |
| # construct the parameter list for best hyper-parameters |
| while(k <= ncol(pip)) |
| { |
| |
| end = as.integer(i+as.integer(as.scalar(hp[1,i]))) |
| mat = hp[1, i+1:end] |
| i = end + 1 |
| ls = append(ls, mat) |
| k = k + 1 |
| } |
| |
| clone_X_train = X_train |
| clone_X_test = X_test |
| X_train = replace(target = X_train, pattern = NaN, replacement = 0) |
| X_test = replace(target = X_test, pattern = NaN, replacement = 0) |
| |
| # classify without cleaning |
| betas = multiLogReg(X=X_train, Y=y_train, icpt=2, tol=1e-9, reg=0.00001, maxi=50, |
| maxii=50, verbose=FALSE) |
| [d_prob, d_yhat, d_accuracy] = multiLogRegPredict(X_test, betas, y_test, FALSE) |
| [confusionCount_d, confusionAVG_d] = confusionMatrix(P=d_yhat, Y=y_test) |
| |
| |
| print("dirty confusion matrix \n"+toString(confusionCount_d)) |
| |
| X_train = clone_X_train |
| X_test = clone_X_test |
| # clean using best pipeline |
| [X_train_clean, Y_train_clean] = executePipeline(pip, X_train, y_train, cmask, schema, ls, FALSE) |
| [X_test_clean, Y_test_clean] = executePipeline(pip, X_test, y_test, cmask, schema, ls, FALSE) |
| while(FALSE){} |
| # classify after cleaning |
| betas = multiLogReg(X=X_train_clean, Y=Y_train_clean, icpt=2, tol=1e-9, reg=0.00001, maxi=50, |
| maxii=50, verbose=FALSE) |
| [c_prob, c_yhat, c_accuracy] = multiLogRegPredict(X_test_clean, betas, Y_test_clean, FALSE) |
| |
| [confusionCount_c, confusionAVG_c] = confusionMatrix(P=c_yhat, Y=Y_test_clean) |
| |
| print("accuracy of dirty accuracy "+d_accuracy) |
| print("accuracy of val accuracy "+valAcc) |
| print("accuracy of test accuracy "+c_accuracy) |
| print("clean confusion matrix \n"+toString(confusionCount_c)) |
| |
| result = cbind(as.matrix(d_accuracy), as.matrix(c_accuracy)) |
| } |
| |
| |
| # stratified sampling |
| doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio) |
| return (Matrix[Double] eX, Matrix[Double] eY) |
| { |
| MIN_SAMPLE = 10000 |
| sampled = floor(nrow(eX) * ratio) |
| sample = ifelse(sampled > MIN_SAMPLE, TRUE, FALSE) |
| if(sample) |
| { |
| XY = order(target = cbind(eY, eX), by = 1, decreasing=FALSE, index.return=FALSE) |
| # get the class count |
| classes = table(eY, 1) |
| print("classes") |
| print(toString(classes)) |
| while(FALSE){} |
| start_class = 1 |
| out_s = 1 |
| out_e = 0 |
| end_class = 0 |
| |
| out = matrix(0, sampled, ncol(XY)) |
| classes_ratio = floor(classes*ratio) |
| print("class ratio "+toString(classes_ratio)) |
| for(i in 1:nrow(classes)) |
| { |
| end_class = end_class + as.scalar(classes[i]) |
| class_t = XY[start_class:end_class, ] |
| out_e = out_e + as.scalar(classes_ratio[i]) |
| out[out_s:out_e, ] = class_t[1:as.scalar(classes_ratio[i]), ] |
| out_s = out_e + 1 |
| start_class = end_class + 1 |
| } |
| out = removeEmpty(target = out, margin = "rows") |
| eY = out[, 1] |
| eX = out[, 2:ncol(out)] |
| } |
| } |
| |
| # incomplete implementation of automatic logical pipelines |
| generateLogical = function(Matrix[Double] X, Matrix[Double] Y) |
| { |
| logical = as.frame("") |
| # get the stats |
| no_of_mv = sum(is.na(X)) |
| X = replace(target= X, pattern = NaN, replacement = 0) |
| colMin = colMins(X) |
| colMax = colMaxs(X) |
| colMean = colMeans(X) |
| colSd = colSds(X) |
| count3sdplus = sum(X > (colMean + 3*colSd )) |
| count3sdminus = sum(X < (colMean - 3*colSd )) |
| outliers = count3sdplus + count3sdminus |
| ctab = table(Y, 1) |
| minCatPer = min(ctab) / nrow(ctab) |
| maxCat = max(ctab) / nrow(ctab) |
| |
| mv_to_data_ratio = no_of_mv/(nrow(X) * ncol(X)) |
| out_to_data_ratio = outliers/ (nrow(X) * ncol(X)) |
| |
| if(mv_to_data_ratio > 0.1) |
| logical = cbind(logical, as.frame("MVI")) |
| if(out_to_data_ratio > 0.1) |
| logical = cbind(logical, as.frame("OTLR")) |
| if((maxCat - minCatPer) > 0.3) |
| logical = cbind(logical, as.frame("CI")) |
| } |
| |
| # stratified splitting |
| getDataSplits = function(Matrix[Double] X, Matrix[Double] Y, Double splitRatio, Boolean verbose) |
| return (Matrix[Double] X_train, Matrix[Double] y_train, Matrix[Double] X_test, |
| Matrix[Double] y_test) |
| { |
| |
| |
| XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE) |
| # get the class count |
| classes = table(Y, 1) |
| |
| split = floor(nrow(X) * splitRatio) |
| start_class = 1 |
| train_row_s = 1 |
| test_row_s = 1 |
| train_row_e = 0 |
| test_row_e = 0 |
| end_class = 0 |
| |
| outTrain = matrix(0, split+nrow(classes), ncol(XY)) |
| outTest = matrix(0, (nrow(X) - split)+nrow(classes), ncol(XY)) |
| |
| classes_ratio_train = floor(classes*splitRatio) |
| classes_ratio_test = classes - classes_ratio_train |
| if(verbose) { |
| print("rows "+nrow(X)) |
| print("classes \n"+toString(classes)) |
| print("train ratio \n"+toString(classes_ratio_train)) |
| print("test ratio \n"+toString(classes_ratio_test)) |
| } |
| for(i in 1:nrow(classes)) |
| { |
| end_class = end_class + as.scalar(classes[i]) |
| class_t = XY[start_class:end_class, ] |
| |
| train_row_e = train_row_e + as.scalar(classes_ratio_train[i]) |
| test_row_e = test_row_e + as.scalar(classes_ratio_test[i]) |
| outTrain[train_row_s:train_row_e, ] = class_t[1:as.scalar(classes_ratio_train[i]), ] |
| outTest[test_row_s:test_row_e, ] = class_t[as.scalar(classes_ratio_train[i])+1:nrow(class_t), ] |
| |
| train_row_s = train_row_e + 1 |
| test_row_s = test_row_e + 1 |
| start_class = end_class + 1 |
| } |
| |
| outTrain = removeEmpty(target = outTrain, margin = "rows") |
| outTest = removeEmpty(target = outTest, margin = "rows") |
| y_train = outTrain[, 1] |
| X_train = outTrain[, 2:ncol(outTrain)] |
| y_test = outTest[, 1] |
| X_test = outTest[, 2:ncol(outTest)] |
| } |
| |
| # entropy calculation for finding class imbalance |
| getBalanceScore = function(Matrix[Double] Y) |
| return (Matrix[Double] isBalanced) |
| { |
| # get count of instances in each class |
| k = table(Y, 1) |
| n = nrow(Y) |
| # compute Shannon entropy for i to k H = − ∑ ci/n * log(ci/n) |
| H = -sum((k/n) * log(k/n)) |
| # Balance = H / log(k) return 0 for unbalance data and 1 for balanced data |
| isBalanced = H / log(k) |
| } |