# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
s_executePipeline = function(Frame[String] pipeline, Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask,
Frame[Unknown] schema, List[Unknown] hyperParameters, Boolean verbose)
return (Matrix[Double] X, Matrix[Double] Y)
if(verbose) {
print("checks rows in X = "+nrow(X)+" rows in Y = "+nrow(Y)+" cols in X = "+ncol(X)+" col in Y = "+ncol(Y))
print("pipeline in execution "+toString(pipeline))
print("pipeline hps "+toString(hyperParameters))
print("mask "+toString(mask))
print("col max"+toString(colMaxs(X)))
for(i in 1:ncol(pipeline)) {
op = as.scalar(pipeline[1,i])
[hp, withClass] = matrixToList(X, Y, mask, as.matrix(hyperParameters[i]), op)
X = eval(op, hp)
Y = X[, ncol(X)]
X = X[, 1:ncol(X) - 1]
X = confirmMeta(X, mask)
# This function will convert the matrix row-vector into list
matrixToList = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Matrix[Double] p, String op)
return (List[Unknown] l, Boolean hasY)
hasY = FALSE
hasVerbose = as.scalar(p[1, ncol(p)])
yFlag = as.scalar(p[1, ncol(p) - 1])
l = list(X)
if(yFlag == 1) {
l = append(l, Y)
hasY = TRUE
if(ncol(p) > 2) {
if(op == "mice")
l = append(l, mask)
if(op == "pca") {
ratio = as.scalar(p[1,1])
p[1, 1] = as.integer(ncol(X) - ratio)
for(i in 1:ncol(p)-2)
l = append(l, as.scalar(p[1,i]))
if(hasVerbose == 1)
l = append(l, FALSE)
# print(toString(l, rows=2))
confirmMeta = function(Matrix[Double] X, Matrix[Double] mask)
return (Matrix[Double] X)
if(sum(mask) > 0)
# get the max + 1 for nan replacement
nanMask =
# replace nan
X = replace(target = X, pattern = NaN, replacement = 9999)
# take categorical out
cat = removeEmpty(target=X, margin="cols", select = mask)
# round categorical (if there is any floating point)
cat = ceil(cat)
print("cat less than zero")
print(sum(cat <= 0))
# reconstruct original X
X = X * (mask == 0)
q = table(seq(1, ncol(cat)), removeEmpty(target=seq(1, ncol(mask)), margin="rows",
select=t(mask)), ncol(cat), ncol(X))
X = (cat %*% q) + X
# put nan back
nanMask = replace(target = nanMask, pattern = 1, replacement = NaN)
X = X + nanMask