[SYSTEMDS-2636] Built-in function for univariate statistics

Added univar and bivar builtins, federated tests, fixed fed matrix max

Closes #1035.
diff --git a/scripts/builtin/univar.dml b/scripts/builtin/univar.dml
new file mode 100644
index 0000000..80a4606
--- /dev/null
+++ b/scripts/builtin/univar.dml
@@ -0,0 +1,94 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+#
+# Computes univariate statistics for all attributes in a given data set
+#
+# INPUT PARAMETERS:
+# -------------------------------------------------------------------------------------------------
+# NAME           TYPE               DEFAULT  MEANING
+# -------------------------------------------------------------------------------------------------
+# X              Matrix[Double]     ---      Input matrix of the shape (N, D)
+# TYPES          Matrix[Integer]    ---      Matrix of the shape (1, D) with features types:
+#                                            1 for scale, 2 for nominal, 3 for ordinal
+# -------------------------------------------------------------------------------------------------
+# OUTPUT: Matrix of summary statistics
+
+m_univar = function(Matrix[Double] X, Matrix[Double] types)
+return(Matrix[Double] univarStats)
+{
+  max_kind = max(types);
+  N = nrow(X);
+  D = ncol(X);
+
+  # Number of statistics (14 scale, 3 categorical)
+  numBaseStats = 17;
+  univarStats = matrix(0, rows=numBaseStats, cols=D);
+
+  # Compute max domain size among all categorical attributes
+  maxDomain = as.integer(max((types > 1) * colMaxs(X)));
+
+  parfor(i in 1:D, check=0) {
+    F = X[,i];
+
+    type = as.scalar(types[1,i]);
+    minF = min(F);
+    maxF = max(F);
+
+    if (type == 1) {
+      # compute SCALE statistics on the projected column
+      rng = maxF - minF;
+
+      mu = mean(F);
+      m2 = moment(F, 2);
+      m3 = moment(F, 3);
+      m4 = moment(F, 4);
+
+      var = N/(N-1.0)*m2;
+      std_dev = sqrt(var);
+      se = std_dev/sqrt(N);
+      cv = std_dev/mu;
+
+      g1 = m3/(std_dev^3);
+      g2 = m4/(std_dev^4) - 3;
+      se_g1=sqrt( (6/(N-2.0)) * (N/(N+1.0)) * ((N-1.0)/(N+3.0)) );
+      se_g2=sqrt( (4/(N+5.0)) * ((N^2-1)/(N-3.0)) * se_g1^2 );
+
+      md = median(F);
+      iqm = interQuartileMean(F);
+
+      univarStats[1:14,i] = as.matrix(list(minF, maxF, rng,
+        mu, var, std_dev, se, cv, g1, g2, se_g1, se_g2, md, iqm));
+    }
+
+    if (type == 2 | type == 3) {
+      # check if the categorical column has valid values
+      if( minF <= 0 ) {
+        print("ERROR: Categorical attributes can only take values starting from 1. Encountered a value " + minF + " in attribute " + i);
+      }
+
+      # compute CATEGORICAL statistics on the projected column
+      cat_counts = table(F, 1, maxDomain, 1);
+      mode = as.scalar(rowIndexMax(t(cat_counts)));
+      numModes = sum(cat_counts == max(cat_counts));
+      univarStats[15:17,i] = as.matrix(list(maxF, mode, numModes));
+    }
+  }
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 0a05d15..9e14f50 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -191,6 +191,7 @@
 	COUNT_DISTINCT_APPROX("countDistinctApprox",false),
 	VAR("var", false),
 	XOR("xor", false),
+	UNIVAR("univar", true),
 	WINSORIZE("winsorize", true, false), //TODO parameterize w/ prob, min/max val
 
 	//parameterized builtin functions
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 7df7c51..bdec97f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -47,15 +47,15 @@
 public class FederationUtils {
 	protected static Logger log = Logger.getLogger(FederationUtils.class);
 	private static final IDSequence _idSeq = new IDSequence();
-	
+
 	public static void resetFedDataID() {
 		_idSeq.reset();
 	}
-	
+
 	public static long getNextFedDataID() {
 		return _idSeq.getNextID();
 	}
-	
+
 	public static FederatedRequest callInstruction(String inst, CPOperand varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
 		//TODO better and safe replacement of operand names --> instruction utils
 		long id = getNextFedDataID();
@@ -85,7 +85,7 @@
 			throw new DMLRuntimeException(ex);
 		}
 	}
-	
+
 	public static MatrixBlock aggMean(Future<FederatedResponse>[] ffr, FederationMap map) {
 		try {
 			FederatedRange[] ranges = map.getFederatedRanges();
@@ -108,7 +108,22 @@
 			throw new DMLRuntimeException(ex);
 		}
 	}
-	
+
+	public static DoubleObject aggMinMax(Future<FederatedResponse>[] ffr, boolean isMin, boolean isScalar) {
+		try {
+			double res = isMin ? Double.MAX_VALUE : - Double.MAX_VALUE;
+			for (Future<FederatedResponse> fr: ffr){
+				double v = isScalar ? ((ScalarObject)fr.get().getData()[0]).getDoubleValue() :
+					isMin ? ((MatrixBlock) fr.get().getData()[0]).min() : ((MatrixBlock) fr.get().getData()[0]).max();
+				res = isMin ? Math.min(res, v) : Math.max(res, v);
+			}
+			return new DoubleObject(res);
+		}
+		catch (Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
 	public static MatrixBlock[] getResults(Future<FederatedResponse>[] ffr) {
 		try {
 			MatrixBlock[] ret = new MatrixBlock[ffr.length];
@@ -136,25 +151,19 @@
 
 	public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr) {
 		if(!(aop.aggOp.increOp.fn instanceof KahanFunction || (aop.aggOp.increOp.fn instanceof Builtin &&
-			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
-				((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+				(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+						((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
-				+ aop.aggOp.increOp.getClass().getSimpleName());
+					+ aop.aggOp.increOp.getClass().getSimpleName());
 		}
 
 		try {
 			if(aop.aggOp.increOp.fn instanceof Builtin){
 				// then we know it is a Min or Max based on the previous check.
 				boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
-				double res = isMin ? Double.MAX_VALUE: - Double.MAX_VALUE;
-				double v;
-				for (Future<FederatedResponse> fr: ffr){
-					v = ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
-					res = isMin ? Math.min(res, v) : Math.max(res, v);
-				}
-				return new DoubleObject(res);
-			} 
-			else {		
+				return aggMinMax(ffr, isMin, true);
+			}
+			else {
 				double sum = 0; //uak+
 				for( Future<FederatedResponse> fr : ffr )
 					sum += ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
@@ -172,17 +181,22 @@
 			//independent of aggregation function for row-partitioned federated matrices
 			return rbind(ffr);
 		}
-		
 		// handle col aggregate
 		if( aop.aggOp.increOp.fn instanceof KahanFunction )
 			return aggAdd(ffr);
 		else if( aop.aggOp.increOp.fn instanceof Mean )
 			return aggMean(ffr, map);
+		else if (aop.aggOp.increOp.fn instanceof Builtin &&
+			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+			((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
+			boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
+			return new MatrixBlock(1,1,aggMinMax(ffr, isMin, false).getDoubleValue());
+		}
 		else
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
 				+ aop.aggOp.increOp.fn.getClass().getSimpleName());
 	}
-	
+
 	public static void waitFor(List<Future<FederatedResponse>> responses) {
 		try {
 			for(Future<FederatedResponse> fr : responses)
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
new file mode 100644
index 0000000..906beec
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedBivarTest extends AutomatedTestBase {
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedBivarTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedUnivarTest.class.getSimpleName() + "/";
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+				{10000, 10},
+//				{2000, 50}, {1000, 10},
+//				{10000, 10}, {2000, 50}, {1000, 100}
+		});
+	}
+
+	@Test
+	@Ignore
+	public void federatedBivarSinglenode() {
+		federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	@Ignore
+	public void federatedBivarHybrid() {
+		federatedL2SVM(Types.ExecMode.HYBRID);
+	}
+
+	public void federatedL2SVM(Types.ExecMode execMode) {
+		Types.ExecMode platformOld = setExecMode(execMode);
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int quarterRows = rows / 4;
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 9);
+
+		// write types matrix shape of (1, D)
+		double [][] T1 = getRandomMatrix(1, cols, 0, 2, 1, 9);
+		Arrays.stream(T1[0]).forEach(n -> Math.ceil(n));
+
+		double [][] T2 = getRandomMatrix(1, cols, 0, 2, 1, 9);
+		Arrays.stream(T2[0]).forEach(n -> Math.ceil(n));
+
+		double [][] S1 = getRandomMatrix(1, (int) cols/5, 1, cols, 1, 3);
+		Arrays.stream(S1[0]).forEach(n -> Math.ceil(n));
+
+		double [][] S2 = getRandomMatrix(1, (int) cols/4, 1, cols, 1, 9);
+		Arrays.stream(S2[0]).forEach(n -> Math.ceil(n));
+
+		MatrixCharacteristics mc= new MatrixCharacteristics(quarterRows, cols, blocksize, quarterRows * cols);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+		writeInputMatrixWithMTD("S1", S1, false);
+		writeInputMatrixWithMTD("S2", S2, false);
+		writeInputMatrixWithMTD("T1", T1, false);
+		writeInputMatrixWithMTD("T2", T2, false);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		int port3 = getRandomAvailablePort();
+		int port4 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorker(port1);
+		Thread t2 = startLocalFedWorker(port2);
+		Thread t3 = startLocalFedWorker(port3);
+		Thread t4 = startLocalFedWorker(port4);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+		setOutputBuffering(false);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-stats", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("S1"), input("S2"), input("T1"), input("T2"), expected("B")};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrix
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
+			"in_S1=" + input("S1"),
+			"in_S2=" + input("S2"),
+			"in_T1=" + input("T1"),
+			"in_T2=" + input("T2"),
+			"rows=" + rows, "cols=" + cols,
+			"out=" + output("B")};
+		runTest(true, false, null, -1);
+
+		// compare via files
+//		compareResults(1e-9);
+		TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+		// check for federated operations
+//		Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
+//		Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
+//		Assert.assertTrue(heavyHittersContainsString("fed_tsmm"));
+//		if( scaleAndShift ) {
+//			Assert.assertTrue(heavyHittersContainsString("fed_uacsqk+"));
+//			Assert.assertTrue(heavyHittersContainsString("fed_uacmean"));
+//			Assert.assertTrue(heavyHittersContainsString("fed_-"));
+//			Assert.assertTrue(heavyHittersContainsString("fed_/"));
+//			Assert.assertTrue(heavyHittersContainsString("fed_replace"));
+//		}
+
+		//check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("S1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("S2")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("T1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("T2")));
+
+		resetExecMode(platformOld);
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
new file mode 100644
index 0000000..a704d3a
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedUnivarTest extends AutomatedTestBase {
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedUnivarTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedUnivarTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+				{10000, 16},
+				{2000, 32}, {1000, 64}, {10000, 128}
+		});
+	}
+
+	@Test
+	public void federatedUnivarSinglenode() {
+		federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void federatedUnivarHybrid() {
+		federatedL2SVM(Types.ExecMode.HYBRID);
+	}
+
+	public void federatedL2SVM(Types.ExecMode execMode) {
+		Types.ExecMode platformOld = setExecMode(execMode);
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int quarterCols = cols / 4;
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 9);
+
+		// write types matrix shape of (1, D)
+		double [][] Y = getRandomMatrix(1, cols, 0, 3, 1, 9);
+		Arrays.stream(Y[0]).forEach(Math::ceil);
+
+		MatrixCharacteristics mc= new MatrixCharacteristics(rows, quarterCols, blocksize, rows * quarterCols);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+		writeInputMatrixWithMTD("Y", Y, false);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		int port3 = getRandomAvailablePort();
+		int port4 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorker(port1);
+		Thread t2 = startLocalFedWorker(port2);
+		Thread t3 = startLocalFedWorker(port3);
+		Thread t4 = startLocalFedWorker(port4);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+		setOutputBuffering(false);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("Y"), expected("B")};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrix
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats",  "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
+			"in_Y=" + input("Y"), // types
+			"rows=" + rows, "cols=" + cols,
+			"out=" + output("B")};
+		runTest(true, false, null, -1);
+
+		// compare via files
+		compareResults(1e-9);
+		TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+		// check for federated operations
+		Assert.assertTrue(heavyHittersContainsString("fed_uacmax"));
+
+		//check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("Y")));
+
+		resetExecMode(platformOld);
+	}
+}
diff --git a/src/test/scripts/functions/federated/FederatedBivarTest.dml b/src/test/scripts/functions/federated/FederatedBivarTest.dml
new file mode 100644
index 0000000..94b197a
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBivarTest.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+S1 = read($in_S1);
+S2 = read($in_S2);
+T1 = read($in_T1);
+T2 = read($in_T2);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+write(B, $out);
+
diff --git a/src/test/scripts/functions/federated/FederatedBivarTestReference.dml b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
new file mode 100644
index 0000000..b28347c
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rbind(read($1), read($2), read($3), read($4));
+S1 = read($5);
+S2 = read($6);
+T1 = read($7);
+T2 = read($8);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+write(B, $9);
\ No newline at end of file
diff --git a/src/test/scripts/functions/federated/FederatedUnivarTest.dml b/src/test/scripts/functions/federated/FederatedUnivarTest.dml
new file mode 100644
index 0000000..443c0c1
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedUnivarTest.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+#    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+#		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+
+X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2), list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+Y = read($in_Y);
+B = univar(X=X, types=Y);
+write(B, $out);
+
diff --git a/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml b/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml
new file mode 100644
index 0000000..46072e7
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#X = rbind(read($1), read($2), read($3), read($4));
+X = cbind(read($1), read($2), read($3), read($4));
+types = read($5);
+B = univar(X=X, types=types);
+write(B, $6);