[SYSTEMDS-2681] Federated Bivariate Statistics

SYSTEMDS 2543-2544 Federated Aggregations:

- Federated Min + Max col and row aggregation
- Federated mean and sum aggregations

Closes #1040
diff --git a/scripts/builtin/bivar.dml b/scripts/builtin/bivar.dml
new file mode 100644
index 0000000..e05f483
--- /dev/null
+++ b/scripts/builtin/bivar.dml
@@ -0,0 +1,324 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+#
+# For a given pair of attribute sets, compute bivariate statistics between all attribute pairs.
+# Given, index1 = {A_11, A_12, ... A_1m} and index2 = {A_21, A_22, ... A_2n}
+# compute bivariate stats for m*n pairs (A_1i, A_2j), (1<= i <=m) and (1<= j <=n).
+#
+# INPUT PARAMETERS:
+# -------------------------------------------------------------------------------------------------
+# NAME           TYPE               DEFAULT  MEANING
+# -------------------------------------------------------------------------------------------------
+# X              Matrix[Double]     ---      Input matrix
+# S1             Matrix[Integer]    ---      First attribute set {A_11, A_12, ... A_1m}
+# S2             Matrix[Integer]    ---      Second attribute set {A_21, A_22, ... A_2n}
+# T1             Matrix[Integer]    ---      Kind for attributes in S1
+#                                      (kind=1 for scale, kind=2 for nominal, kind=3 for ordinal)
+# verbose        Boolean            ---      Print bivar stats
+# -------------------------------------------------------------------------------------------------
+# OUTPUT: Four matrices with bivar stats
+#-------------------------------------------------------------
+
+m_bivar = function(Matrix[Double] X, Matrix[Double] S1, Matrix[Double] S2, Matrix[Double] T1, Matrix[Double] T2, Boolean verbose)
+  return (Matrix[Double] basestats_scale_scale, Matrix[Double] basestats_nominal_scale, Matrix[Double] basestats_nominal_nominal, Matrix[Double] basestats_ordinal_ordinal)
+{
+  s1size = ncol(S1);
+  s2size = ncol(S2);
+  numPairs = s1size * s2size;
+
+  #test: 1 is Pearson'R, 2 is F-test, 3 is chi-squared, 4 is Spearman'sRho
+  # R, (chisq, df, pval, cramersv,) spearman, eta, anovaf, feature_col_index1, feature_col_index2, test
+
+  num_scale_scale_tests = 0
+  num_nominal_nominal_tests = 0
+  num_ordinal_ordinal_tests = 0
+  num_nominal_scale_tests = 0
+
+  pair2row = matrix(0, rows=numPairs, cols=2)
+  for( i in 1:s1size, check=0) {
+    pre_a1 = as.scalar(S1[1,i]);
+    pre_t1 = as.scalar(T1[1,i]);
+
+    for( j in 1:s2size, check=0) {
+      pre_pairID = (i-1)*s2size+j;
+      pre_a2 = as.scalar(S2[1,j]);
+      pre_t2 = as.scalar(T2[1,j]);
+
+      if (pre_t1 == pre_t2) {
+        if (pre_t1 == 1) {
+          num_scale_scale_tests = num_scale_scale_tests + 1
+          pair2row[pre_pairID,1] = num_scale_scale_tests
+        }
+        else {
+          num_nominal_nominal_tests = num_nominal_nominal_tests + 1
+          pair2row[pre_pairID,1] = num_nominal_nominal_tests
+          if ( pre_t1 == 3 ) {
+            num_ordinal_ordinal_tests = num_ordinal_ordinal_tests + 1
+            pair2row[pre_pairID, 2] = num_ordinal_ordinal_tests
+          }
+        }
+      }
+      else {
+        if (pre_t1 == 1 | pre_t2 == 1) {
+          num_nominal_scale_tests = num_nominal_scale_tests + 1
+          pair2row[pre_pairID,1] = num_nominal_scale_tests
+        }
+        else {
+          num_nominal_nominal_tests = num_nominal_nominal_tests + 1
+          pair2row[pre_pairID,1] = num_nominal_nominal_tests
+        }
+      }
+    }
+  }
+
+  size_scale_scale_tests = max(num_scale_scale_tests, 1);
+  size_nominal_nominal_tests = max(num_nominal_nominal_tests, 1)
+  size_ordinal_ordinal_tests = max(num_ordinal_ordinal_tests, 1);
+  size_nominal_scale_tests = max(num_nominal_scale_tests, 1);
+
+  basestats = matrix(0, rows=11, cols=numPairs);
+  basestats_scale_scale = matrix(0, rows=6, cols=size_scale_scale_tests)
+  basestats_nominal_nominal = matrix(0, rows=6, cols=size_nominal_nominal_tests)
+  basestats_ordinal_ordinal = matrix(0, rows=3, cols=size_ordinal_ordinal_tests)
+  basestats_nominal_scale = matrix(0, rows=11, cols=size_nominal_scale_tests)
+
+
+  # Compute max domain size among all categorical attributes
+  # and check if these cols have been recoded
+
+  debug_str = "Stopping execution of DML script due to invalid input";
+  error_flag = FALSE;
+  maxs = colMaxs(X);
+  mins = colMins(X)
+  maxDomainSize = -1.0;
+  for(k in 1:ncol(T1) ) {
+    type = as.scalar(T1[1,k]);
+    if ( type > 1) {
+      colID = as.scalar(S1[1,k]);
+      colMaximum = as.scalar(maxs[1,colID]);
+      #colMaximum = max(X[,colID]);
+      if(maxDomainSize < colMaximum) maxDomainSize = colMaximum;
+      colMinimum = as.scalar(mins[1,colID]);
+      #colMinimum = min(X[,colID]);
+      if(colMinimum < 1){
+        debug_str = ifelse(type == 2,
+             append(debug_str, "Column " + colID + " was declared as nominal but its minimum value is " + colMinimum),
+             append(debug_str, "Column " + colID + " was declared as ordinal but its minimum value is " + colMinimum));
+        error_flag = TRUE;
+      }
+    }
+  }
+
+  for(k in 1:ncol(T2) ) {
+    type = as.scalar(T2[1,k]);
+    if ( type > 1) {
+      colID = as.scalar(T2[1,k]);
+      colMaximum = as.scalar(maxs[1,colID]);
+      #colMaximum = max(X[,colID]);
+      maxDomainSize = max(maxDomainSize, colMaximum);
+      colMinimum = as.scalar(mins[1,colID]);
+      #colMinimum = min(X[,colID]);
+      if(colMinimum < 1){
+        debug_str = ifelse(type == 2,
+             append(debug_str, "Column " + colID + " was declared as nominal but its minimum value is " + colMinimum),
+              append(debug_str, "Column " + colID + " was declared as ordinal but its minimum value is " + colMinimum));
+        error_flag = TRUE;
+      }
+    }
+  }
+  maxDomain = as.integer(maxDomainSize);
+  if(error_flag)
+    stop(debug_str);
+
+  parfor( i in 1:s1size, check=0) {
+    a1 = as.scalar(S1[1,i]);
+    k1 = as.scalar(T1[1,i]);
+    A1 = X[,a1];
+    parfor( j in 1:s2size, check=0) {
+      pairID = (i-1)*s2size+j;
+      a2 = as.scalar(S2[1,j]);
+      k2 = as.scalar(T2[1,j]);
+      A2 = X[,a2];
+      rowid1 = as.scalar(pair2row[pairID, 1])
+      rowid2 = as.scalar(pair2row[pairID, 2])
+
+      if (k1 == k2) {
+        if (k1 == 1) {
+          # scale-scale
+          if (verbose == TRUE) print("[" + i + "," + j + "] scale-scale");
+          [r, cov, sigma1, sigma2] = bivar_ss(A1,A2);
+          basestats_scale_scale[1:6,rowid1] = as.matrix(list(a1,a2,r,cov,sigma1,sigma2));
+        }
+        else {
+          # nominal-nominal or ordinal-ordinal
+          if (verbose == TRUE) print("[" + i + "," + j + "] categorical-categorical");
+          [chisq, df, pval, cramersv]  = bivar_cc(A1, A2, maxDomain);
+          basestats_nominal_nominal[1:6,rowid1] = as.matrix(list(a1,a2,chisq,df,pval,cramersv));
+          if ( k1 == 3 ) {
+            # ordinal-ordinal
+            if (verbose == TRUE) print("[" + i + "," + j + "] ordinal-ordinal");
+            sp = bivar_oo(A1, A2, maxDomain);
+            basestats_ordinal_ordinal[1:3,rowid2] = as.matrix(list(a1,a2,sp));
+          }
+        }
+      }
+      else if (k1 == 1 | k2 == 1) {
+        # Scale-nominal/ordinal
+        if (verbose == TRUE) print("[" + i + "," + j + "] scale-categorical");
+        if ( k1 == 1 )
+          [eta, f, pval, bw_ss, within_ss, bw_df, within_df, bw_mean_square, within_mean_square] = bivar_sc(A1, A2, maxDomain);
+        else
+          [eta, f, pval, bw_ss, within_ss, bw_df, within_df, bw_mean_square, within_mean_square] = bivar_sc(A2, A1, maxDomain);
+        basestats_nominal_scale[1:11,rowid1] = as.matrix(list(a1,a2,eta,f,pval,bw_ss,within_ss,bw_df,within_df,bw_mean_square,within_mean_square));
+      }
+      else {
+        # nominal-ordinal or ordinal-nominal
+        if (verbose == TRUE) print("[" + i + "," + j + "] categorical-categorical");
+        [chisq, df, pval, cramersv]  = bivar_cc(A1, A2, maxDomain);
+        basestats_nominal_nominal[1:6,rowid1] = as.matrix(list(a1,a2,chisq,df,pval,cramersv));
+      }
+    }
+  }
+}
+
+
+bivar_cc = function(Matrix[Double] A, Matrix[Double] B, Double maxDomain)
+  return (Double chisq, Double df, Double pval, Double cramersv)
+{
+  # Contingency Table
+  F = table(A, B, maxDomain, maxDomain);
+  F = F[1:max(A), 1:max(B)];
+
+  # Chi-Squared
+  W = sum(F);
+  r = rowSums(F);
+  c = colSums(F);
+  E = (r %*% c)/W;
+  T = (F-E)^2/E;
+  chi_squared = sum(T);
+
+  # compute p-value
+  degFreedom = (nrow(F)-1)*(ncol(F)-1);
+  pValue = pchisq(target=chi_squared, df=degFreedom, lower.tail=FALSE);
+
+  # Cramer's V
+  R = nrow(F);
+  C = ncol(F);
+  q = min(R,C);
+  cramers_v = sqrt(chi_squared/(W*(q-1)));
+
+  # Assign return values
+  chisq = chi_squared;
+  df = as.double(degFreedom);
+  pval = pValue;
+  cramersv = cramers_v;
+}
+
+
+bivar_ss = function(Matrix[Double] X, Matrix[Double] Y)
+  return (Double R, Double covXY, Double sigmaX, Double sigmaY)
+{
+  # Unweighted co-variance
+  covXY = cov(X,Y);
+
+  # compute standard deviations for both X and Y by computing 2^nd central moment
+  W = nrow(X);
+  m2X = moment(X,2);
+  m2Y = moment(Y,2);
+  sigmaX = sqrt(m2X * (W/(W-1.0)) );
+  sigmaY = sqrt(m2Y * (W/(W-1.0)) );
+
+  # Pearson's R
+  R = covXY / (sigmaX*sigmaY);
+}
+
+
+# Y points to SCALE variable
+# A points to CATEGORICAL variable
+bivar_sc = function(Matrix[Double] Y, Matrix[Double] A, Double maxDomain)
+  return (Double Eta, Double AnovaF, Double pval, Double bw_ss, Double within_ss, Double bw_df, Double within_df, Double bw_mean_square, Double within_mean_square)
+{
+  # mean and variance in target variable
+  W = nrow(A);
+  my = mean(Y);
+  varY = moment(Y,2) * W/(W-1.0)
+
+  # category-wise (frequencies, means, variances)
+  CFreqs = aggregate(target=Y, groups=A, fn="count", ngroups=maxDomain);
+  CMeans = aggregate(target=Y, groups=A, fn="mean", ngroups=maxDomain);
+  CVars =  aggregate(target=Y, groups=A, fn="variance", ngroups=maxDomain);
+
+  # number of categories
+  R = nrow(CFreqs);
+
+  Eta = sqrt(1 - ( sum((CFreqs-1)*CVars) / ((W-1)*varY) ));
+
+  bw_ss = sum( (CFreqs*(CMeans-my)^2) );
+  bw_df = as.double(R-1);
+  bw_mean_square = bw_ss/bw_df;
+
+  within_ss = sum( (CFreqs-1)*CVars );
+  within_df = as.double(W-R);
+  within_mean_square = within_ss/within_df;
+
+  AnovaF = bw_mean_square/within_mean_square;
+
+  pval = pf(target=AnovaF, df1=bw_df, df2=within_df, lower.tail=FALSE)
+}
+
+
+computeRanks = function(Matrix[Double] X)
+  return (Matrix[Double] Ranks) {
+  Ranks = cumsum(X) - X/2 + 1/2;
+}
+
+
+bivar_oo = function(Matrix[Double] A, Matrix[Double] B, Double maxDomain)
+  return (Double sp)
+{
+  # compute contingency table
+  F = table(A, B, maxDomain, maxDomain);
+  F = F[1:max(A), 1:max(B)];
+
+  catA = nrow(F);  # number of categories in A
+  catB = ncol(F);  # number of categories in B
+
+  # compute category-wise counts for both the attributes
+  R = rowSums(F);
+  S = colSums(F);
+
+  # compute scores, both are column vectors
+  [C] = computeRanks(R);
+  meanX = mean(C,R);
+
+  columnS = t(S);
+  [D] = computeRanks(columnS);
+
+  # scores (C,D) are individual values, and counts (R,S) act as weights
+  meanY = mean(D,columnS);
+
+  W = sum(F); # total weight, or total #cases
+  varX = moment(C,R,2)*(W/(W-1.0));
+  varY = moment(D,columnS,2)*(W/(W-1.0));
+  covXY = sum( t(F/(W-1) * (C-meanX)) * (D-meanY) );
+
+  sp = covXY/(sqrt(varX)*sqrt(varY));
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 9ff8f78..f565124 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -53,6 +53,7 @@
 	BITWXOR("bitwXor", false),
 	BITWSHIFTL("bitwShiftL", false),
 	BITWSHIFTR("bitwShiftR", false),
+	BIVAR("bivar", true),
 	CAST_AS_SCALAR("as.scalar", "castAsScalar", false),
 	CAST_AS_MATRIX("as.matrix", false),
 	CAST_AS_FRAME("as.frame", false),
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index c51254b..726198a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -68,7 +68,7 @@
 			f.channel().closeFuture().sync();
 		}
 		catch (InterruptedException e) {
-			log.error("Federated worker interrupted");
+			log.info("Federated worker interrupted");
 		}
 		finally {
 			log.info("Federated Worker Shutting down.");
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 0872c59..c8da781 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Future;
 
 import org.apache.log4j.Logger;
@@ -112,21 +113,6 @@
 		}
 	}
 
-	public static DoubleObject aggMinMax(Future<FederatedResponse>[] ffr, boolean isMin, boolean isScalar) {
-		try {
-			double res = isMin ? Double.MAX_VALUE : - Double.MAX_VALUE;
-			for (Future<FederatedResponse> fr: ffr){
-				double v = isScalar ? ((ScalarObject)fr.get().getData()[0]).getDoubleValue() :
-					isMin ? ((MatrixBlock) fr.get().getData()[0]).min() : ((MatrixBlock) fr.get().getData()[0]).max();
-				res = isMin ? Math.min(res, v) : Math.max(res, v);
-			}
-			return new DoubleObject(res);
-		}
-		catch (Exception ex) {
-			throw new DMLRuntimeException(ex);
-		}
-	}
-
 	public static MatrixBlock[] getResults(Future<FederatedResponse>[] ffr) {
 		try {
 			MatrixBlock[] ret = new MatrixBlock[ffr.length];
@@ -139,32 +125,61 @@
 		}
 	}
 
-	public static MatrixBlock rbind(Future<FederatedResponse>[] ffr) {
+	public static MatrixBlock bind(Future<FederatedResponse>[] ffr, boolean cbind) {
 		// TODO handle non-contiguous cases
 		try {
 			MatrixBlock[] tmp = getResults(ffr);
 			return tmp[0].append(
 				Arrays.copyOfRange(tmp, 1, tmp.length),
-				new MatrixBlock(), false);
+				new MatrixBlock(), cbind);
 		}
 		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 	}
 
+	public static MatrixBlock aggMinMax(Future<FederatedResponse>[] ffr, boolean isMin, boolean isScalar, Optional<FederationMap.FType> fedType) {
+		try {
+			if (!fedType.isPresent() || fedType.get() == FederationMap.FType.OTHER) {
+				double res = isMin ? Double.MAX_VALUE : -Double.MAX_VALUE;
+				for (Future<FederatedResponse> fr : ffr) {
+					double v = isScalar ? ((ScalarObject) fr.get().getData()[0]).getDoubleValue() :
+						isMin ? ((MatrixBlock) fr.get().getData()[0]).min() : ((MatrixBlock) fr.get().getData()[0]).max();
+					res = isMin ? Math.min(res, v) : Math.max(res, v);
+				}
+				return new MatrixBlock(1, 1, res);
+			} else {
+				MatrixBlock[] tmp = getResults(ffr);
+				int dim = fedType.get() == FederationMap.FType.COL ? tmp[0].getNumRows() : tmp[0].getNumColumns();
+
+				for (int i = 0; i < ffr.length - 1; i++)
+					for (int j = 0; j < dim; j++)
+						if (fedType.get() == FederationMap.FType.COL)
+							tmp[i + 1].setValue(j, 0, isMin ? Double.min(tmp[i].getValue(j, 0), tmp[i + 1].getValue(j, 0)) :
+								Double.max(tmp[i].getValue(j, 0), tmp[i + 1].getValue(j, 0)));
+						else tmp[i + 1].setValue(0, j, isMin ? Double.min(tmp[i].getValue(0, j), tmp[i + 1].getValue(0, j)) :
+							Double.max(tmp[i].getValue(0, j), tmp[i + 1].getValue(0, j)));
+				return tmp[ffr.length-1];
+			}
+		}
+		catch (Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
 	public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr) {
 		if(!(aop.aggOp.increOp.fn instanceof KahanFunction || (aop.aggOp.increOp.fn instanceof Builtin &&
-				(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
-						((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+				((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
-					+ aop.aggOp.increOp.getClass().getSimpleName());
+				+ aop.aggOp.increOp.getClass().getSimpleName());
 		}
 
 		try {
 			if(aop.aggOp.increOp.fn instanceof Builtin){
 				// then we know it is a Min or Max based on the previous check.
 				boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
-				return aggMinMax(ffr, isMin, true);
+				return new DoubleObject(aggMinMax(ffr, isMin, true,  Optional.empty()).getValue(0,0));
 			}
 			else {
 				double sum = 0; //uak+
@@ -179,23 +194,21 @@
 	}
 
 	public static MatrixBlock aggMatrix(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, FederationMap map) {
-		// handle row aggregate
-		if( aop.isRowAggregate() ) {
-			//independent of aggregation function for row-partitioned federated matrices
-			return rbind(ffr);
-		}
-		// handle col aggregate
-		if( aop.aggOp.increOp.fn instanceof KahanFunction )
+		if (aop.isRowAggregate() && map.getType() == FederationMap.FType.ROW)
+			return bind(ffr, false);
+		else if (aop.isColAggregate() && map.getType() == FederationMap.FType.COL)
+			return bind(ffr, true);
+
+		if (aop.aggOp.increOp.fn instanceof KahanFunction)
 			return aggAdd(ffr);
 		else if( aop.aggOp.increOp.fn instanceof Mean )
 			return aggMean(ffr, map);
 		else if (aop.aggOp.increOp.fn instanceof Builtin &&
 			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
-			((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
+				((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
 			boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
-			return new MatrixBlock(1,1,aggMinMax(ffr, isMin, false).getDoubleValue());
-		}
-		else
+			return aggMinMax(ffr,isMin,false, Optional.of(map.getType()));
+		} else
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
 				+ aop.aggOp.increOp.fn.getClass().getSimpleName());
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index c28a163..bdb9784 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -84,7 +84,7 @@
 				FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr1.getID(), fr2.getID());
 				//execute federated operations and aggregate
 				Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
-				MatrixBlock ret = FederationUtils.rbind(tmp);
+				MatrixBlock ret = FederationUtils.bind(tmp, false);
 				ec.setMatrixOutput(output.getName(), ret);
 			}
 			else { //MM
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index b810d1f..ed9150a 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.sysds.test.functions.federated.algorithms;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.util.HDFSTool;
@@ -26,20 +29,16 @@
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
-import java.util.Collection;
-
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
 public class FederatedBivarTest extends AutomatedTestBase {
 	private final static String TEST_DIR = "functions/federated/";
 	private final static String TEST_NAME = "FederatedBivarTest";
-	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedUnivarTest.class.getSimpleName() + "/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedBivarTest.class.getSimpleName() + "/";
 	private final static int blocksize = 1024;
 	@Parameterized.Parameter()
 	public int rows;
@@ -54,21 +53,15 @@
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][] {
-				{10000, 10},
-//				{2000, 50}, {1000, 10},
-//				{10000, 10}, {2000, 50}, {1000, 100}
-		});
+		return Arrays.asList(new Object[][] {{10000, 16}, {2000, 32}, {1000, 64}, {10000, 128}});
 	}
 
 	@Test
-	@Ignore
 	public void federatedBivarSinglenode() {
 		federatedL2SVM(Types.ExecMode.SINGLE_NODE);
 	}
 
 	@Test
-	@Ignore
 	public void federatedBivarHybrid() {
 		federatedL2SVM(Types.ExecMode.HYBRID);
 	}
@@ -80,27 +73,26 @@
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
 		// write input matrices
-		int quarterRows = rows / 4;
-		// We have two matrices handled by a single federated worker
-		double[][] X1 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 3);
-		double[][] X2 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 7);
-		double[][] X3 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 8);
-		double[][] X4 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 9);
+		int quarterCols = cols / 4;
+
+		double[][] X1 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 9);
+
+		// generate attribute set
+		double[][] S1 = getRandomMatrix(1, (int) cols / 4, 1, cols, 1, 3);
+		TestUtils.floor(S1);
+		double[][] S2 = getRandomMatrix(1, (int) cols / 4, 1, cols, 1, 9);
+		TestUtils.floor(S2);
 
 		// write types matrix shape of (1, D)
-		double [][] T1 = getRandomMatrix(1, cols, 0, 2, 1, 9);
-		Arrays.stream(T1[0]).forEach(n -> Math.ceil(n));
+		double[][] T1 = getRandomMatrix(1, (int) cols / 4, 0, 2, 1, 9);
+		TestUtils.ceil(T1);
+		double[][] T2 = getRandomMatrix(1, (int) cols / 4, 0, 2, 1, 9);
+		TestUtils.ceil(T2);
 
-		double [][] T2 = getRandomMatrix(1, cols, 0, 2, 1, 9);
-		Arrays.stream(T2[0]).forEach(n -> Math.ceil(n));
-
-		double [][] S1 = getRandomMatrix(1, (int) cols/5, 1, cols, 1, 3);
-		Arrays.stream(S1[0]).forEach(n -> Math.ceil(n));
-
-		double [][] S2 = getRandomMatrix(1, (int) cols/4, 1, cols, 1, 9);
-		Arrays.stream(S2[0]).forEach(n -> Math.ceil(n));
-
-		MatrixCharacteristics mc= new MatrixCharacteristics(quarterRows, cols, blocksize, quarterRows * cols);
+		MatrixCharacteristics mc = new MatrixCharacteristics(rows, quarterCols, blocksize, rows * quarterCols);
 		writeInputMatrixWithMTD("X1", X1, false, mc);
 		writeInputMatrixWithMTD("X2", X2, false, mc);
 		writeInputMatrixWithMTD("X3", X3, false, mc);
@@ -116,52 +108,34 @@
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorkerThread(port1);
-		Thread t2 = startLocalFedWorkerThread(port2);
-		Thread t3 = startLocalFedWorkerThread(port3);
-		Thread t4 = startLocalFedWorkerThread(port4);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+		Process t3 = startLocalFedWorker(port3);
+		Process t4 = startLocalFedWorker(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
-		
 
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-		programArgs = new String[] {"-stats", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("S1"), input("S2"), input("T1"), input("T2"), expected("B")};
+		programArgs = new String[] {"-stats", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("S1"),
+			input("S2"), input("T1"), input("T2"), expected("B")};
 		runTest(true, false, null, -1);
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats", "-nvargs",
-			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+		programArgs = new String[] {"-stats", "-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
 			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
-			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
-			"in_S1=" + input("S1"),
-			"in_S2=" + input("S2"),
-			"in_T1=" + input("T1"),
-			"in_T2=" + input("T2"),
-			"rows=" + rows, "cols=" + cols,
-			"out=" + output("B")};
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "in_S1=" + input("S1"), "in_S2=" + input("S2"),
+			"in_T1=" + input("T1"), "in_T2=" + input("T2"), "rows=" + rows, "cols=" + cols, "out=" + output("B")};
 		runTest(true, false, null, -1);
 
 		// compare via files
-//		compareResults(1e-9);
+		compareResults(1e-9);
 		TestUtils.shutdownThreads(t1, t2, t3, t4);
 
-		// check for federated operations
-//		Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
-//		Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
-//		Assert.assertTrue(heavyHittersContainsString("fed_tsmm"));
-//		if( scaleAndShift ) {
-//			Assert.assertTrue(heavyHittersContainsString("fed_uacsqk+"));
-//			Assert.assertTrue(heavyHittersContainsString("fed_uacmean"));
-//			Assert.assertTrue(heavyHittersContainsString("fed_-"));
-//			Assert.assertTrue(heavyHittersContainsString("fed_/"));
-//			Assert.assertTrue(heavyHittersContainsString("fed_replace"));
-//		}
-
-		//check that federated input files are still existing
+		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
new file mode 100644
index 0000000..00f0f50
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.lops.LopProperties.ExecType;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedFullAggregateTest extends AutomatedTestBase {
+	private final static String TEST_NAME1 = "FederatedSumTest";
+	private final static String TEST_NAME2 = "FederatedMeanTest";
+	private final static String TEST_NAME3 = "FederatedMaxTest";
+	private final static String TEST_NAME4 = "FederatedMinTest";
+
+	private final static String TEST_DIR = "functions/federated/aggregate/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedFullAggregateTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+	@Parameterized.Parameter(2)
+	public boolean rowPartitioned;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(
+			new Object[][] {{10, 1000, false}, {100, 4, false}, {36, 1000, true}, {1000, 10, true}, {4, 100, true}});
+	}
+
+	private enum OpType {
+		SUM, MEAN, MAX, MIN
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S.scalar"}));
+		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S.scalar"}));
+	}
+
+	@Test
+	public void testSumDenseMatrixCP() {
+		runColAggregateOperationTest(OpType.SUM, ExecType.CP);
+	}
+
+	@Test
+	public void testMeanDenseMatrixCP() {
+		runColAggregateOperationTest(OpType.MEAN, ExecType.CP);
+	}
+
+	@Test
+	public void testMaxDenseMatrixCP() {
+		runColAggregateOperationTest(OpType.MAX, ExecType.CP);
+	}
+
+	@Test
+	public void testMinDenseMatrixCP() {
+		runColAggregateOperationTest(OpType.MIN, ExecType.CP);
+	}
+
+	@Test
+	public void testSumDenseMatrixSP() {
+		runColAggregateOperationTest(OpType.SUM, ExecType.SPARK);
+	}
+
+	@Test
+	public void testMeanDenseMatrixSP() {
+		runColAggregateOperationTest(OpType.MEAN, ExecType.SPARK);
+	}
+
+	@Test
+	public void testMaxDenseMatrixSP() {
+		runColAggregateOperationTest(OpType.MAX, ExecType.SPARK);
+	}
+
+	@Test
+	public void testMinDenseMatrixSP() {
+		runColAggregateOperationTest(OpType.MIN, ExecType.SPARK);
+	}
+
+	private void runColAggregateOperationTest(OpType type, ExecType instType) {
+		ExecMode platformOld = rtplatform;
+		switch(instType) {
+			case SPARK:
+				rtplatform = ExecMode.SPARK;
+				break;
+			default:
+				rtplatform = ExecMode.HYBRID;
+				break;
+		}
+
+		if(rtplatform == ExecMode.SPARK)
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+		String TEST_NAME = null;
+		switch(type) {
+			case SUM:
+				TEST_NAME = TEST_NAME1;
+				break;
+			case MEAN:
+				TEST_NAME = TEST_NAME2;
+				break;
+			case MAX:
+				TEST_NAME = TEST_NAME3;
+				break;
+			case MIN:
+				TEST_NAME = TEST_NAME4;
+				break;
+		}
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int r = rows;
+		int c = cols / 4;
+		if(rowPartitioned) {
+			r = rows / 4;
+			c = cols;
+		}
+
+		double[][] X1 = getRandomMatrix(r, c, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(r, c, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(r, c, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(r, c, 1, 5, 1, 9);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		int port3 = getRandomAvailablePort();
+		int port4 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t2 = startLocalFedWorkerThread(port2);
+		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t4 = startLocalFedWorkerThread(port4);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
+			expected("S"), Boolean.toString(rowPartitioned).toUpperCase()};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrix
+
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats", "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
+			"rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
+
+		runTest(true, false, null, -1);
+
+		// compare via files
+		compareResults(1e-9);
+
+		switch(type) {
+			case SUM:
+				Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+				break;
+			case MEAN:
+				Assert.assertTrue(heavyHittersContainsString("fed_uamean"));
+				break;
+			case MAX:
+				Assert.assertTrue(heavyHittersContainsString("fed_uamax"));
+				break;
+			case MIN:
+				Assert.assertTrue(heavyHittersContainsString("fed_uamin"));
+				break;
+		}
+
+		// check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+		TestUtils.shutdownThreads(t1, t2, t3, t4);
+		resetExecMode(platformOld);
+
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
new file mode 100644
index 0000000..b1c3991
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedRowColAggregateTest extends AutomatedTestBase {
+	private final static String TEST_NAME1 = "FederatedColSumTest";
+	private final static String TEST_NAME2 = "FederatedColMeanTest";
+	private final static String TEST_NAME3 = "FederatedColMaxTest";
+	private final static String TEST_NAME4 = "FederatedColMinTest";
+	private final static String TEST_NAME5 = "FederatedRowSumTest";
+	private final static String TEST_NAME6 = "FederatedRowMeanTest";
+	private final static String TEST_NAME7 = "FederatedRowMaxTest";
+	private final static String TEST_NAME8 = "FederatedRowMinTest";
+
+	private final static String TEST_DIR = "functions/federated/aggregate/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRowColAggregateTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+	@Parameterized.Parameter(2)
+	public boolean rowPartitioned;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(
+			new Object[][] {{10, 1000, false}, 
+			//{100, 4, false}, {36, 1000, true}, {1000, 10, true}, {4, 100, true}
+		});
+	}
+
+	private enum OpType {
+		SUM, MEAN, MAX, MIN
+	}
+
+	private enum InstType {
+		ROW, COL
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME8, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME8, new String[] {"S"}));
+	}
+
+	@Test
+	public void testColSumDenseMatrixCP() {
+		runAggregateOperationTest(OpType.SUM, InstType.COL, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testColMeanDenseMatrixCP() {
+		runAggregateOperationTest(OpType.MEAN, InstType.COL, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testColMaxDenseMatrixCP() {
+		runAggregateOperationTest(OpType.MAX, InstType.COL, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testRowSumDenseMatrixCP() {
+		runAggregateOperationTest(OpType.SUM, InstType.ROW, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testRowMeanDenseMatrixCP() {
+		runAggregateOperationTest(OpType.MEAN, InstType.ROW, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testRowMaxDenseMatrixCP() {
+		runAggregateOperationTest(OpType.MAX, InstType.ROW, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testRowMinDenseMatrixCP() {
+		runAggregateOperationTest(OpType.MIN, InstType.ROW, ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void testColMinDenseMatrixCP() {
+		runAggregateOperationTest(OpType.MIN, InstType.COL, ExecMode.SINGLE_NODE);
+	}
+
+	private void runAggregateOperationTest(OpType type, InstType instr, ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		ExecMode platformOld = rtplatform;
+
+		if(rtplatform == ExecMode.SPARK)
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+		String TEST_NAME = null;
+		switch(type) {
+			case SUM:
+				TEST_NAME = instr == InstType.COL ? TEST_NAME1 : TEST_NAME5;
+				break;
+			case MEAN:
+				TEST_NAME = instr == InstType.COL ? TEST_NAME2 : TEST_NAME6;
+				break;
+			case MAX:
+				TEST_NAME = instr == InstType.COL ? TEST_NAME3 : TEST_NAME7;
+				break;
+			case MIN:
+				TEST_NAME = instr == InstType.COL ? TEST_NAME4 : TEST_NAME8;
+				break;
+		}
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int r = rows;
+		int c = cols / 4;
+		if(rowPartitioned) {
+			r = rows / 4;
+			c = cols;
+		}
+
+		double[][] X1 = getRandomMatrix(r, c, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(r, c, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(r, c, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(r, c, 1, 5, 1, 9);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		int port3 = getRandomAvailablePort();
+		int port4 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t2 = startLocalFedWorkerThread(port2);
+		Thread t3 = startLocalFedWorkerThread(port3);
+		Thread t4 = startLocalFedWorkerThread(port4);
+
+		rtplatform = execMode;
+		if(rtplatform == ExecMode.SPARK) {
+			System.out.println(7);
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
+			expected("S"), Boolean.toString(rowPartitioned).toUpperCase()};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrix
+
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats", "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
+			"rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
+
+		runTest(true, false, null, -1);
+
+		// compare via files
+		compareResults(1e-9);
+
+		String fedInst = instr == InstType.COL ? "fed_uac" : "fed_uar";
+
+		switch(type) {
+			case SUM:
+				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("k+")));
+				break;
+			case MEAN:
+				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("mean")));
+				break;
+			case MAX:
+				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("max")));
+				break;
+			case MIN:
+				Assert.assertTrue(heavyHittersContainsString(fedInst.concat("min")));
+				break;
+		}
+
+		// check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+		TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+
+	}
+}
diff --git a/src/test/scripts/functions/federated/FederatedBivarTest.dml b/src/test/scripts/functions/federated/FederatedBivarTest.dml
index 94b197a..9a0ba4a 100644
--- a/src/test/scripts/functions/federated/FederatedBivarTest.dml
+++ b/src/test/scripts/functions/federated/FederatedBivarTest.dml
@@ -20,12 +20,10 @@
 #-------------------------------------------------------------
 
 X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
-    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
-		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2), list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
 S1 = read($in_S1);
 S2 = read($in_S2);
 T1 = read($in_T1);
 T2 = read($in_T2);
-B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2, verbose=FALSE);
 write(B, $out);
-
diff --git a/src/test/scripts/functions/federated/FederatedBivarTestReference.dml b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
index b28347c..565b941 100644
--- a/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
+++ b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
@@ -19,10 +19,10 @@
 #
 #-------------------------------------------------------------
 
-X = rbind(read($1), read($2), read($3), read($4));
+X = cbind(read($1), read($2), read($3), read($4));
 S1 = read($5);
 S2 = read($6);
 T1 = read($7);
 T2 = read($8);
-B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
-write(B, $9);
\ No newline at end of file
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2, verbose=FALSE);
+write(B, $9);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMaxTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTest.dml
new file mode 100644
index 0000000..85e2ce6
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colMaxs(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMaxTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTestReference.dml
new file mode 100644
index 0000000..7a51647
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colMaxs(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMeanTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTest.dml
new file mode 100644
index 0000000..f455b62
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colMeans(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMeanTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTestReference.dml
new file mode 100644
index 0000000..934fee5
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colMeans(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMinTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMinTest.dml
new file mode 100644
index 0000000..db19f0e
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMinTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colMins(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMinTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMinTestReference.dml
new file mode 100644
index 0000000..ba3c967
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMinTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colMins(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColSumTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColSumTest.dml
new file mode 100644
index 0000000..d5cf9b8
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColSumTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colSums(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColSumTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColSumTestReference.dml
new file mode 100644
index 0000000..aba56e8
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColSumTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colSums(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMaxTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedMaxTest.dml
new file mode 100644
index 0000000..c5c209f
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMaxTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = max(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMaxTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedMaxTestReference.dml
new file mode 100644
index 0000000..887cd3c
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMaxTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = max(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMeanTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedMeanTest.dml
new file mode 100644
index 0000000..22d3f0b
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMeanTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = mean(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMeanTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedMeanTestReference.dml
new file mode 100644
index 0000000..8566399
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMeanTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = mean(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMinTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedMinTest.dml
new file mode 100644
index 0000000..990cd3a
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMinTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = min(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMinTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedMinTestReference.dml
new file mode 100644
index 0000000..353fa4d
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMinTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = min(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTest.dml
new file mode 100644
index 0000000..76af48a
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowMaxs(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTestReference.dml
new file mode 100644
index 0000000..485ad13
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowMaxs(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTest.dml
new file mode 100644
index 0000000..1dac255
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowMeans(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTestReference.dml
new file mode 100644
index 0000000..fd21323
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowMeans(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMinTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTest.dml
new file mode 100644
index 0000000..11932df
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowMins(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMinTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTestReference.dml
new file mode 100644
index 0000000..b4b050a
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowMins(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowSumTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTest.dml
new file mode 100644
index 0000000..2f39039
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowSums(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowSumTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTestReference.dml
new file mode 100644
index 0000000..1dab2a5
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowSums(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedSumTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedSumTest.dml
new file mode 100644
index 0000000..9de439e
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedSumTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = sum(A);
+
+write(s, $out_S);
\ No newline at end of file
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedSumTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedSumTestReference.dml
new file mode 100644
index 0000000..d7d3540
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedSumTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = sum(A);
+
+write(s, $5);