| #------------------------------------------------------------- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| #------------------------------------------------------------- |
| |
| s_executePipeline = function(Frame[String] pipeline, Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, |
| Frame[Unknown] schema, List[Unknown] hyperParameters, Boolean verbose) |
| return (Matrix[Double] X, Matrix[Double] Y) |
| { |
| |
| print("PIPELINE EXECUTION START ... ") |
| if(verbose) { |
| print("checks rows in X = "+nrow(X)+" rows in Y = "+nrow(Y)+" cols in X = "+ncol(X)+" col in Y = "+ncol(Y)) |
| print("pipeline in execution "+toString(pipeline)) |
| print("pipeline hps "+toString(hyperParameters)) |
| print("mask "+toString(mask)) |
| print("col max"+toString(colMaxs(X))) |
| while(FALSE){} |
| } |
| for(i in 1:ncol(pipeline)) { |
| op = as.scalar(pipeline[1,i]) |
| [hp, withClass] = matrixToList(X, Y, mask, as.matrix(hyperParameters[i]), op) |
| X = eval(op, hp) |
| |
| if(withClass) |
| { |
| Y = X[, ncol(X)] |
| X = X[, 1:ncol(X) - 1] |
| } |
| |
| X = confirmMeta(X, mask) |
| } |
| while(FALSE){} |
| } |
| |
| # This function will convert the matrix row-vector into list |
| matrixToList = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Matrix[Double] p, String op) |
| return (List[Unknown] l, Boolean hasY) |
| { |
| hasY = FALSE |
| hasVerbose = as.scalar(p[1, ncol(p)]) |
| yFlag = as.scalar(p[1, ncol(p) - 1]) |
| l = list(X) |
| if(yFlag == 1) { |
| l = append(l, Y) |
| hasY = TRUE |
| } |
| |
| if(ncol(p) > 2) { |
| if(op == "mice") |
| l = append(l, mask) |
| if(op == "pca") { |
| ratio = as.scalar(p[1,1]) |
| p[1, 1] = as.integer(ncol(X) - ratio) |
| } |
| for(i in 1:ncol(p)-2) |
| l = append(l, as.scalar(p[1,i])) |
| } |
| |
| if(hasVerbose == 1) |
| l = append(l, FALSE) |
| |
| # print(toString(l, rows=2)) |
| } |
| |
| confirmMeta = function(Matrix[Double] X, Matrix[Double] mask) |
| return (Matrix[Double] X) |
| { |
| if(sum(mask) > 0) |
| { |
| # get the max + 1 for nan replacement |
| nanMask = is.na(X) |
| # replace nan |
| X = replace(target = X, pattern = NaN, replacement = 9999) |
| # take categorical out |
| cat = removeEmpty(target=X, margin="cols", select = mask) |
| # round categorical (if there is any floating point) |
| cat = ceil(cat) |
| print("cat less than zero") |
| print(sum(cat <= 0)) |
| # reconstruct original X |
| X = X * (mask == 0) |
| q = table(seq(1, ncol(cat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows", |
| select=t(mask)), ncol(cat), ncol(X)) |
| X = (cat %*% q) + X |
| # put nan back |
| nanMask = replace(target = nanMask, pattern = 1, replacement = NaN) |
| X = X + nanMask |
| } |
| } |