[SYSTEMDS-2636] Built-in function for univariate statistics
Added univar and bivar builtins, federated tests, fixed fed matrix max
Closes #1035.
diff --git a/scripts/builtin/univar.dml b/scripts/builtin/univar.dml
new file mode 100644
index 0000000..80a4606
--- /dev/null
+++ b/scripts/builtin/univar.dml
@@ -0,0 +1,94 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+#
+# Computes univariate statistics for all attributes in a given data set
+#
+# INPUT PARAMETERS:
+# -------------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# -------------------------------------------------------------------------------------------------
+# X Matrix[Double] --- Input matrix of the shape (N, D)
+# TYPES Matrix[Integer] --- Matrix of the shape (1, D) with features types:
+# 1 for scale, 2 for nominal, 3 for ordinal
+# -------------------------------------------------------------------------------------------------
+# OUTPUT: Matrix of summary statistics
+
+m_univar = function(Matrix[Double] X, Matrix[Double] types)
+return(Matrix[Double] univarStats)
+{
+ max_kind = max(types);
+ N = nrow(X);
+ D = ncol(X);
+
+ # Number of statistics (14 scale, 3 categorical)
+ numBaseStats = 17;
+ univarStats = matrix(0, rows=numBaseStats, cols=D);
+
+ # Compute max domain size among all categorical attributes
+ maxDomain = as.integer(max((types > 1) * colMaxs(X)));
+
+ parfor(i in 1:D, check=0) {
+ F = X[,i];
+
+ type = as.scalar(types[1,i]);
+ minF = min(F);
+ maxF = max(F);
+
+ if (type == 1) {
+ # compute SCALE statistics on the projected column
+ rng = maxF - minF;
+
+ mu = mean(F);
+ m2 = moment(F, 2);
+ m3 = moment(F, 3);
+ m4 = moment(F, 4);
+
+ var = N/(N-1.0)*m2;
+ std_dev = sqrt(var);
+ se = std_dev/sqrt(N);
+ cv = std_dev/mu;
+
+ g1 = m3/(std_dev^3);
+ g2 = m4/(std_dev^4) - 3;
+ se_g1=sqrt( (6/(N-2.0)) * (N/(N+1.0)) * ((N-1.0)/(N+3.0)) );
+ se_g2=sqrt( (4/(N+5.0)) * ((N^2-1)/(N-3.0)) * se_g1^2 );
+
+ md = median(F);
+ iqm = interQuartileMean(F);
+
+ univarStats[1:14,i] = as.matrix(list(minF, maxF, rng,
+ mu, var, std_dev, se, cv, g1, g2, se_g1, se_g2, md, iqm));
+ }
+
+ if (type == 2 | type == 3) {
+ # check if the categorical column has valid values
+ if( minF <= 0 ) {
+ print("ERROR: Categorical attributes can only take values starting from 1. Encountered a value " + minF + " in attribute " + i);
+ }
+
+ # compute CATEGORICAL statistics on the projected column
+ cat_counts = table(F, 1, maxDomain, 1);
+ mode = as.scalar(rowIndexMax(t(cat_counts)));
+ numModes = sum(cat_counts == max(cat_counts));
+ univarStats[15:17,i] = as.matrix(list(maxF, mode, numModes));
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 0a05d15..9e14f50 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -191,6 +191,7 @@
COUNT_DISTINCT_APPROX("countDistinctApprox",false),
VAR("var", false),
XOR("xor", false),
+ UNIVAR("univar", true),
WINSORIZE("winsorize", true, false), //TODO parameterize w/ prob, min/max val
//parameterized builtin functions
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 7df7c51..bdec97f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -47,15 +47,15 @@
public class FederationUtils {
protected static Logger log = Logger.getLogger(FederationUtils.class);
private static final IDSequence _idSeq = new IDSequence();
-
+
public static void resetFedDataID() {
_idSeq.reset();
}
-
+
public static long getNextFedDataID() {
return _idSeq.getNextID();
}
-
+
public static FederatedRequest callInstruction(String inst, CPOperand varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
//TODO better and safe replacement of operand names --> instruction utils
long id = getNextFedDataID();
@@ -85,7 +85,7 @@
throw new DMLRuntimeException(ex);
}
}
-
+
public static MatrixBlock aggMean(Future<FederatedResponse>[] ffr, FederationMap map) {
try {
FederatedRange[] ranges = map.getFederatedRanges();
@@ -108,7 +108,22 @@
throw new DMLRuntimeException(ex);
}
}
-
+
+ public static DoubleObject aggMinMax(Future<FederatedResponse>[] ffr, boolean isMin, boolean isScalar) {
+ try {
+ double res = isMin ? Double.MAX_VALUE : - Double.MAX_VALUE;
+ for (Future<FederatedResponse> fr: ffr){
+ double v = isScalar ? ((ScalarObject)fr.get().getData()[0]).getDoubleValue() :
+ isMin ? ((MatrixBlock) fr.get().getData()[0]).min() : ((MatrixBlock) fr.get().getData()[0]).max();
+ res = isMin ? Math.min(res, v) : Math.max(res, v);
+ }
+ return new DoubleObject(res);
+ }
+ catch (Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+
public static MatrixBlock[] getResults(Future<FederatedResponse>[] ffr) {
try {
MatrixBlock[] ret = new MatrixBlock[ffr.length];
@@ -136,25 +151,19 @@
public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr) {
if(!(aop.aggOp.increOp.fn instanceof KahanFunction || (aop.aggOp.increOp.fn instanceof Builtin &&
- (((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
- ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+ (((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+ ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
throw new DMLRuntimeException("Unsupported aggregation operator: "
- + aop.aggOp.increOp.getClass().getSimpleName());
+ + aop.aggOp.increOp.getClass().getSimpleName());
}
try {
if(aop.aggOp.increOp.fn instanceof Builtin){
// then we know it is a Min or Max based on the previous check.
boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
- double res = isMin ? Double.MAX_VALUE: - Double.MAX_VALUE;
- double v;
- for (Future<FederatedResponse> fr: ffr){
- v = ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
- res = isMin ? Math.min(res, v) : Math.max(res, v);
- }
- return new DoubleObject(res);
- }
- else {
+ return aggMinMax(ffr, isMin, true);
+ }
+ else {
double sum = 0; //uak+
for( Future<FederatedResponse> fr : ffr )
sum += ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
@@ -172,17 +181,22 @@
//independent of aggregation function for row-partitioned federated matrices
return rbind(ffr);
}
-
// handle col aggregate
if( aop.aggOp.increOp.fn instanceof KahanFunction )
return aggAdd(ffr);
else if( aop.aggOp.increOp.fn instanceof Mean )
return aggMean(ffr, map);
+ else if (aop.aggOp.increOp.fn instanceof Builtin &&
+ (((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+ ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
+ boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
+ return new MatrixBlock(1,1,aggMinMax(ffr, isMin, false).getDoubleValue());
+ }
else
throw new DMLRuntimeException("Unsupported aggregation operator: "
+ aop.aggOp.increOp.fn.getClass().getSimpleName());
}
-
+
public static void waitFor(List<Future<FederatedResponse>> responses) {
try {
for(Future<FederatedResponse> fr : responses)
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
new file mode 100644
index 0000000..906beec
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedBivarTest extends AutomatedTestBase {
+ private final static String TEST_DIR = "functions/federated/";
+ private final static String TEST_NAME = "FederatedBivarTest";
+ private final static String TEST_CLASS_DIR = TEST_DIR + FederatedUnivarTest.class.getSimpleName() + "/";
+ private final static int blocksize = 1024;
+ @Parameterized.Parameter()
+ public int rows;
+ @Parameterized.Parameter(1)
+ public int cols;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {10000, 10},
+// {2000, 50}, {1000, 10},
+// {10000, 10}, {2000, 50}, {1000, 100}
+ });
+ }
+
+ @Test
+ @Ignore
+ public void federatedBivarSinglenode() {
+ federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ @Ignore
+ public void federatedBivarHybrid() {
+ federatedL2SVM(Types.ExecMode.HYBRID);
+ }
+
+ public void federatedL2SVM(Types.ExecMode execMode) {
+ Types.ExecMode platformOld = setExecMode(execMode);
+
+ getAndLoadTestConfiguration(TEST_NAME);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+
+ // write input matrices
+ int quarterRows = rows / 4;
+ // We have two matrices handled by a single federated worker
+ double[][] X1 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 3);
+ double[][] X2 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 7);
+ double[][] X3 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 8);
+ double[][] X4 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 9);
+
+ // write types matrix shape of (1, D)
+ double [][] T1 = getRandomMatrix(1, cols, 0, 2, 1, 9);
+ Arrays.stream(T1[0]).forEach(n -> Math.ceil(n));
+
+ double [][] T2 = getRandomMatrix(1, cols, 0, 2, 1, 9);
+ Arrays.stream(T2[0]).forEach(n -> Math.ceil(n));
+
+ double [][] S1 = getRandomMatrix(1, (int) cols/5, 1, cols, 1, 3);
+ Arrays.stream(S1[0]).forEach(n -> Math.ceil(n));
+
+ double [][] S2 = getRandomMatrix(1, (int) cols/4, 1, cols, 1, 9);
+ Arrays.stream(S2[0]).forEach(n -> Math.ceil(n));
+
+ MatrixCharacteristics mc= new MatrixCharacteristics(quarterRows, cols, blocksize, quarterRows * cols);
+ writeInputMatrixWithMTD("X1", X1, false, mc);
+ writeInputMatrixWithMTD("X2", X2, false, mc);
+ writeInputMatrixWithMTD("X3", X3, false, mc);
+ writeInputMatrixWithMTD("X4", X4, false, mc);
+ writeInputMatrixWithMTD("S1", S1, false);
+ writeInputMatrixWithMTD("S2", S2, false);
+ writeInputMatrixWithMTD("T1", T1, false);
+ writeInputMatrixWithMTD("T2", T2, false);
+
+ // empty script name because we don't execute any script, just start the worker
+ fullDMLScriptName = "";
+ int port1 = getRandomAvailablePort();
+ int port2 = getRandomAvailablePort();
+ int port3 = getRandomAvailablePort();
+ int port4 = getRandomAvailablePort();
+ Thread t1 = startLocalFedWorker(port1);
+ Thread t2 = startLocalFedWorker(port2);
+ Thread t3 = startLocalFedWorker(port3);
+ Thread t4 = startLocalFedWorker(port4);
+
+ TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+ loadTestConfiguration(config);
+ setOutputBuffering(false);
+
+ // Run reference dml script with normal matrix
+ fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+ programArgs = new String[] {"-stats", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("S1"), input("S2"), input("T1"), input("T2"), expected("B")};
+ runTest(true, false, null, -1);
+
+ // Run actual dml script with federated matrix
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "-nvargs",
+ "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+ "in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+ "in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+ "in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
+ "in_S1=" + input("S1"),
+ "in_S2=" + input("S2"),
+ "in_T1=" + input("T1"),
+ "in_T2=" + input("T2"),
+ "rows=" + rows, "cols=" + cols,
+ "out=" + output("B")};
+ runTest(true, false, null, -1);
+
+ // compare via files
+// compareResults(1e-9);
+ TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+ // check for federated operations
+// Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
+// Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
+// Assert.assertTrue(heavyHittersContainsString("fed_tsmm"));
+// if( scaleAndShift ) {
+// Assert.assertTrue(heavyHittersContainsString("fed_uacsqk+"));
+// Assert.assertTrue(heavyHittersContainsString("fed_uacmean"));
+// Assert.assertTrue(heavyHittersContainsString("fed_-"));
+// Assert.assertTrue(heavyHittersContainsString("fed_/"));
+// Assert.assertTrue(heavyHittersContainsString("fed_replace"));
+// }
+
+ //check that federated input files are still existing
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("S1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("S2")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("T1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("T2")));
+
+ resetExecMode(platformOld);
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
new file mode 100644
index 0000000..a704d3a
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedUnivarTest extends AutomatedTestBase {
+ private final static String TEST_DIR = "functions/federated/";
+ private final static String TEST_NAME = "FederatedUnivarTest";
+ private final static String TEST_CLASS_DIR = TEST_DIR + FederatedUnivarTest.class.getSimpleName() + "/";
+
+ private final static int blocksize = 1024;
+ @Parameterized.Parameter()
+ public int rows;
+ @Parameterized.Parameter(1)
+ public int cols;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {10000, 16},
+ {2000, 32}, {1000, 64}, {10000, 128}
+ });
+ }
+
+ @Test
+ public void federatedUnivarSinglenode() {
+ federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void federatedUnivarHybrid() {
+ federatedL2SVM(Types.ExecMode.HYBRID);
+ }
+
+ public void federatedL2SVM(Types.ExecMode execMode) {
+ Types.ExecMode platformOld = setExecMode(execMode);
+
+ getAndLoadTestConfiguration(TEST_NAME);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+
+ // write input matrices
+ int quarterCols = cols / 4;
+ // We have two matrices handled by a single federated worker
+ double[][] X1 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 3);
+ double[][] X2 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 7);
+ double[][] X3 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 8);
+ double[][] X4 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 9);
+
+ // write types matrix shape of (1, D)
+ double [][] Y = getRandomMatrix(1, cols, 0, 3, 1, 9);
+ Arrays.stream(Y[0]).forEach(Math::ceil);
+
+ MatrixCharacteristics mc= new MatrixCharacteristics(rows, quarterCols, blocksize, rows * quarterCols);
+ writeInputMatrixWithMTD("X1", X1, false, mc);
+ writeInputMatrixWithMTD("X2", X2, false, mc);
+ writeInputMatrixWithMTD("X3", X3, false, mc);
+ writeInputMatrixWithMTD("X4", X4, false, mc);
+ writeInputMatrixWithMTD("Y", Y, false);
+
+ // empty script name because we don't execute any script, just start the worker
+ fullDMLScriptName = "";
+ int port1 = getRandomAvailablePort();
+ int port2 = getRandomAvailablePort();
+ int port3 = getRandomAvailablePort();
+ int port4 = getRandomAvailablePort();
+ Thread t1 = startLocalFedWorker(port1);
+ Thread t2 = startLocalFedWorker(port2);
+ Thread t3 = startLocalFedWorker(port3);
+ Thread t4 = startLocalFedWorker(port4);
+
+ TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+ loadTestConfiguration(config);
+ setOutputBuffering(false);
+
+ // Run reference dml script with normal matrix
+ fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+ programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("Y"), expected("B")};
+ runTest(true, false, null, -1);
+
+ // Run actual dml script with federated matrix
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "100", "-nvargs",
+ "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+ "in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+ "in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+ "in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
+ "in_Y=" + input("Y"), // types
+ "rows=" + rows, "cols=" + cols,
+ "out=" + output("B")};
+ runTest(true, false, null, -1);
+
+ // compare via files
+ compareResults(1e-9);
+ TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+ // check for federated operations
+ Assert.assertTrue(heavyHittersContainsString("fed_uacmax"));
+
+ //check that federated input files are still existing
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("Y")));
+
+ resetExecMode(platformOld);
+ }
+}
diff --git a/src/test/scripts/functions/federated/FederatedBivarTest.dml b/src/test/scripts/functions/federated/FederatedBivarTest.dml
new file mode 100644
index 0000000..94b197a
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBivarTest.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+S1 = read($in_S1);
+S2 = read($in_S2);
+T1 = read($in_T1);
+T2 = read($in_T2);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+write(B, $out);
+
diff --git a/src/test/scripts/functions/federated/FederatedBivarTestReference.dml b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
new file mode 100644
index 0000000..b28347c
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rbind(read($1), read($2), read($3), read($4));
+S1 = read($5);
+S2 = read($6);
+T1 = read($7);
+T2 = read($8);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+write(B, $9);
\ No newline at end of file
diff --git a/src/test/scripts/functions/federated/FederatedUnivarTest.dml b/src/test/scripts/functions/federated/FederatedUnivarTest.dml
new file mode 100644
index 0000000..443c0c1
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedUnivarTest.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+# ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+# list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+
+X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2), list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+Y = read($in_Y);
+B = univar(X=X, types=Y);
+write(B, $out);
+
diff --git a/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml b/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml
new file mode 100644
index 0000000..46072e7
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#X = rbind(read($1), read($2), read($3), read($4));
+X = cbind(read($1), read($2), read($3), read($4));
+types = read($5);
+B = univar(X=X, types=types);
+write(B, $6);