[SYSTEMDS-2681] Federated Bivariate Statistics
SYSTEMDS 2543-2544 Federated Aggregations:
- Federated Min + Max col and row aggregation
- Federated mean and sum aggregations
Closes #1040
diff --git a/scripts/builtin/bivar.dml b/scripts/builtin/bivar.dml
new file mode 100644
index 0000000..e05f483
--- /dev/null
+++ b/scripts/builtin/bivar.dml
@@ -0,0 +1,324 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+#
+# For a given pair of attribute sets, compute bivariate statistics between all attribute pairs.
+# Given, index1 = {A_11, A_12, ... A_1m} and index2 = {A_21, A_22, ... A_2n}
+# compute bivariate stats for m*n pairs (A_1i, A_2j), (1<= i <=m) and (1<= j <=n).
+#
+# INPUT PARAMETERS:
+# -------------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# -------------------------------------------------------------------------------------------------
+# X Matrix[Double] --- Input matrix
+# S1 Matrix[Integer] --- First attribute set {A_11, A_12, ... A_1m}
+# S2 Matrix[Integer] --- Second attribute set {A_21, A_22, ... A_2n}
+# T1 Matrix[Integer] --- Kind for attributes in S1
+# (kind=1 for scale, kind=2 for nominal, kind=3 for ordinal)
+# verbose Boolean --- Print bivar stats
+# -------------------------------------------------------------------------------------------------
+# OUTPUT: Four matrices with bivar stats
+#-------------------------------------------------------------
+
+m_bivar = function(Matrix[Double] X, Matrix[Double] S1, Matrix[Double] S2, Matrix[Double] T1, Matrix[Double] T2, Boolean verbose)
+ return (Matrix[Double] basestats_scale_scale, Matrix[Double] basestats_nominal_scale, Matrix[Double] basestats_nominal_nominal, Matrix[Double] basestats_ordinal_ordinal)
+{
+ s1size = ncol(S1);
+ s2size = ncol(S2);
+ numPairs = s1size * s2size;
+
+ #test: 1 is Pearson'R, 2 is F-test, 3 is chi-squared, 4 is Spearman'sRho
+ # R, (chisq, df, pval, cramersv,) spearman, eta, anovaf, feature_col_index1, feature_col_index2, test
+
+ num_scale_scale_tests = 0
+ num_nominal_nominal_tests = 0
+ num_ordinal_ordinal_tests = 0
+ num_nominal_scale_tests = 0
+
+ pair2row = matrix(0, rows=numPairs, cols=2)
+ for( i in 1:s1size, check=0) {
+ pre_a1 = as.scalar(S1[1,i]);
+ pre_t1 = as.scalar(T1[1,i]);
+
+ for( j in 1:s2size, check=0) {
+ pre_pairID = (i-1)*s2size+j;
+ pre_a2 = as.scalar(S2[1,j]);
+ pre_t2 = as.scalar(T2[1,j]);
+
+ if (pre_t1 == pre_t2) {
+ if (pre_t1 == 1) {
+ num_scale_scale_tests = num_scale_scale_tests + 1
+ pair2row[pre_pairID,1] = num_scale_scale_tests
+ }
+ else {
+ num_nominal_nominal_tests = num_nominal_nominal_tests + 1
+ pair2row[pre_pairID,1] = num_nominal_nominal_tests
+ if ( pre_t1 == 3 ) {
+ num_ordinal_ordinal_tests = num_ordinal_ordinal_tests + 1
+ pair2row[pre_pairID, 2] = num_ordinal_ordinal_tests
+ }
+ }
+ }
+ else {
+ if (pre_t1 == 1 | pre_t2 == 1) {
+ num_nominal_scale_tests = num_nominal_scale_tests + 1
+ pair2row[pre_pairID,1] = num_nominal_scale_tests
+ }
+ else {
+ num_nominal_nominal_tests = num_nominal_nominal_tests + 1
+ pair2row[pre_pairID,1] = num_nominal_nominal_tests
+ }
+ }
+ }
+ }
+
+ size_scale_scale_tests = max(num_scale_scale_tests, 1);
+ size_nominal_nominal_tests = max(num_nominal_nominal_tests, 1)
+ size_ordinal_ordinal_tests = max(num_ordinal_ordinal_tests, 1);
+ size_nominal_scale_tests = max(num_nominal_scale_tests, 1);
+
+ basestats = matrix(0, rows=11, cols=numPairs);
+ basestats_scale_scale = matrix(0, rows=6, cols=size_scale_scale_tests)
+ basestats_nominal_nominal = matrix(0, rows=6, cols=size_nominal_nominal_tests)
+ basestats_ordinal_ordinal = matrix(0, rows=3, cols=size_ordinal_ordinal_tests)
+ basestats_nominal_scale = matrix(0, rows=11, cols=size_nominal_scale_tests)
+
+
+ # Compute max domain size among all categorical attributes
+ # and check if these cols have been recoded
+
+ debug_str = "Stopping execution of DML script due to invalid input";
+ error_flag = FALSE;
+ maxs = colMaxs(X);
+ mins = colMins(X)
+ maxDomainSize = -1.0;
+ for(k in 1:ncol(T1) ) {
+ type = as.scalar(T1[1,k]);
+ if ( type > 1) {
+ colID = as.scalar(S1[1,k]);
+ colMaximum = as.scalar(maxs[1,colID]);
+ #colMaximum = max(X[,colID]);
+ if(maxDomainSize < colMaximum) maxDomainSize = colMaximum;
+ colMinimum = as.scalar(mins[1,colID]);
+ #colMinimum = min(X[,colID]);
+ if(colMinimum < 1){
+ debug_str = ifelse(type == 2,
+ append(debug_str, "Column " + colID + " was declared as nominal but its minimum value is " + colMinimum),
+ append(debug_str, "Column " + colID + " was declared as ordinal but its minimum value is " + colMinimum));
+ error_flag = TRUE;
+ }
+ }
+ }
+
+ for(k in 1:ncol(T2) ) {
+ type = as.scalar(T2[1,k]);
+ if ( type > 1) {
+ colID = as.scalar(T2[1,k]);
+ colMaximum = as.scalar(maxs[1,colID]);
+ #colMaximum = max(X[,colID]);
+ maxDomainSize = max(maxDomainSize, colMaximum);
+ colMinimum = as.scalar(mins[1,colID]);
+ #colMinimum = min(X[,colID]);
+ if(colMinimum < 1){
+ debug_str = ifelse(type == 2,
+ append(debug_str, "Column " + colID + " was declared as nominal but its minimum value is " + colMinimum),
+ append(debug_str, "Column " + colID + " was declared as ordinal but its minimum value is " + colMinimum));
+ error_flag = TRUE;
+ }
+ }
+ }
+ maxDomain = as.integer(maxDomainSize);
+ if(error_flag)
+ stop(debug_str);
+
+ parfor( i in 1:s1size, check=0) {
+ a1 = as.scalar(S1[1,i]);
+ k1 = as.scalar(T1[1,i]);
+ A1 = X[,a1];
+ parfor( j in 1:s2size, check=0) {
+ pairID = (i-1)*s2size+j;
+ a2 = as.scalar(S2[1,j]);
+ k2 = as.scalar(T2[1,j]);
+ A2 = X[,a2];
+ rowid1 = as.scalar(pair2row[pairID, 1])
+ rowid2 = as.scalar(pair2row[pairID, 2])
+
+ if (k1 == k2) {
+ if (k1 == 1) {
+ # scale-scale
+ if (verbose == TRUE) print("[" + i + "," + j + "] scale-scale");
+ [r, cov, sigma1, sigma2] = bivar_ss(A1,A2);
+ basestats_scale_scale[1:6,rowid1] = as.matrix(list(a1,a2,r,cov,sigma1,sigma2));
+ }
+ else {
+ # nominal-nominal or ordinal-ordinal
+ if (verbose == TRUE) print("[" + i + "," + j + "] categorical-categorical");
+ [chisq, df, pval, cramersv] = bivar_cc(A1, A2, maxDomain);
+ basestats_nominal_nominal[1:6,rowid1] = as.matrix(list(a1,a2,chisq,df,pval,cramersv));
+ if ( k1 == 3 ) {
+ # ordinal-ordinal
+ if (verbose == TRUE) print("[" + i + "," + j + "] ordinal-ordinal");
+ sp = bivar_oo(A1, A2, maxDomain);
+ basestats_ordinal_ordinal[1:3,rowid2] = as.matrix(list(a1,a2,sp));
+ }
+ }
+ }
+ else if (k1 == 1 | k2 == 1) {
+ # Scale-nominal/ordinal
+ if (verbose == TRUE) print("[" + i + "," + j + "] scale-categorical");
+ if ( k1 == 1 )
+ [eta, f, pval, bw_ss, within_ss, bw_df, within_df, bw_mean_square, within_mean_square] = bivar_sc(A1, A2, maxDomain);
+ else
+ [eta, f, pval, bw_ss, within_ss, bw_df, within_df, bw_mean_square, within_mean_square] = bivar_sc(A2, A1, maxDomain);
+ basestats_nominal_scale[1:11,rowid1] = as.matrix(list(a1,a2,eta,f,pval,bw_ss,within_ss,bw_df,within_df,bw_mean_square,within_mean_square));
+ }
+ else {
+ # nominal-ordinal or ordinal-nominal
+ if (verbose == TRUE) print("[" + i + "," + j + "] categorical-categorical");
+ [chisq, df, pval, cramersv] = bivar_cc(A1, A2, maxDomain);
+ basestats_nominal_nominal[1:6,rowid1] = as.matrix(list(a1,a2,chisq,df,pval,cramersv));
+ }
+ }
+ }
+}
+
+
+bivar_cc = function(Matrix[Double] A, Matrix[Double] B, Double maxDomain)
+ return (Double chisq, Double df, Double pval, Double cramersv)
+{
+ # Contingency Table
+ F = table(A, B, maxDomain, maxDomain);
+ F = F[1:max(A), 1:max(B)];
+
+ # Chi-Squared
+ W = sum(F);
+ r = rowSums(F);
+ c = colSums(F);
+ E = (r %*% c)/W;
+ T = (F-E)^2/E;
+ chi_squared = sum(T);
+
+ # compute p-value
+ degFreedom = (nrow(F)-1)*(ncol(F)-1);
+ pValue = pchisq(target=chi_squared, df=degFreedom, lower.tail=FALSE);
+
+ # Cramer's V
+ R = nrow(F);
+ C = ncol(F);
+ q = min(R,C);
+ cramers_v = sqrt(chi_squared/(W*(q-1)));
+
+ # Assign return values
+ chisq = chi_squared;
+ df = as.double(degFreedom);
+ pval = pValue;
+ cramersv = cramers_v;
+}
+
+
+bivar_ss = function(Matrix[Double] X, Matrix[Double] Y)
+ return (Double R, Double covXY, Double sigmaX, Double sigmaY)
+{
+ # Unweighted co-variance
+ covXY = cov(X,Y);
+
+ # compute standard deviations for both X and Y by computing 2^nd central moment
+ W = nrow(X);
+ m2X = moment(X,2);
+ m2Y = moment(Y,2);
+ sigmaX = sqrt(m2X * (W/(W-1.0)) );
+ sigmaY = sqrt(m2Y * (W/(W-1.0)) );
+
+ # Pearson's R
+ R = covXY / (sigmaX*sigmaY);
+}
+
+
+# Y points to SCALE variable
+# A points to CATEGORICAL variable
+bivar_sc = function(Matrix[Double] Y, Matrix[Double] A, Double maxDomain)
+ return (Double Eta, Double AnovaF, Double pval, Double bw_ss, Double within_ss, Double bw_df, Double within_df, Double bw_mean_square, Double within_mean_square)
+{
+ # mean and variance in target variable
+ W = nrow(A);
+ my = mean(Y);
+ varY = moment(Y,2) * W/(W-1.0)
+
+ # category-wise (frequencies, means, variances)
+ CFreqs = aggregate(target=Y, groups=A, fn="count", ngroups=maxDomain);
+ CMeans = aggregate(target=Y, groups=A, fn="mean", ngroups=maxDomain);
+ CVars = aggregate(target=Y, groups=A, fn="variance", ngroups=maxDomain);
+
+ # number of categories
+ R = nrow(CFreqs);
+
+ Eta = sqrt(1 - ( sum((CFreqs-1)*CVars) / ((W-1)*varY) ));
+
+ bw_ss = sum( (CFreqs*(CMeans-my)^2) );
+ bw_df = as.double(R-1);
+ bw_mean_square = bw_ss/bw_df;
+
+ within_ss = sum( (CFreqs-1)*CVars );
+ within_df = as.double(W-R);
+ within_mean_square = within_ss/within_df;
+
+ AnovaF = bw_mean_square/within_mean_square;
+
+ pval = pf(target=AnovaF, df1=bw_df, df2=within_df, lower.tail=FALSE)
+}
+
+
+computeRanks = function(Matrix[Double] X)
+ return (Matrix[Double] Ranks) {
+ Ranks = cumsum(X) - X/2 + 1/2;
+}
+
+
+bivar_oo = function(Matrix[Double] A, Matrix[Double] B, Double maxDomain)
+ return (Double sp)
+{
+ # compute contingency table
+ F = table(A, B, maxDomain, maxDomain);
+ F = F[1:max(A), 1:max(B)];
+
+ catA = nrow(F); # number of categories in A
+ catB = ncol(F); # number of categories in B
+
+ # compute category-wise counts for both the attributes
+ R = rowSums(F);
+ S = colSums(F);
+
+ # compute scores, both are column vectors
+ [C] = computeRanks(R);
+ meanX = mean(C,R);
+
+ columnS = t(S);
+ [D] = computeRanks(columnS);
+
+ # scores (C,D) are individual values, and counts (R,S) act as weights
+ meanY = mean(D,columnS);
+
+ W = sum(F); # total weight, or total #cases
+ varX = moment(C,R,2)*(W/(W-1.0));
+ varY = moment(D,columnS,2)*(W/(W-1.0));
+ covXY = sum( t(F/(W-1) * (C-meanX)) * (D-meanY) );
+
+ sp = covXY/(sqrt(varX)*sqrt(varY));
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 9ff8f78..f565124 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -53,6 +53,7 @@
BITWXOR("bitwXor", false),
BITWSHIFTL("bitwShiftL", false),
BITWSHIFTR("bitwShiftR", false),
+ BIVAR("bivar", true),
CAST_AS_SCALAR("as.scalar", "castAsScalar", false),
CAST_AS_MATRIX("as.matrix", false),
CAST_AS_FRAME("as.frame", false),
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index c51254b..726198a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -68,7 +68,7 @@
f.channel().closeFuture().sync();
}
catch (InterruptedException e) {
- log.error("Federated worker interrupted");
+ log.info("Federated worker interrupted");
}
finally {
log.info("Federated Worker Shutting down.");
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 0872c59..c8da781 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;
@@ -112,21 +113,6 @@
}
}
- public static DoubleObject aggMinMax(Future<FederatedResponse>[] ffr, boolean isMin, boolean isScalar) {
- try {
- double res = isMin ? Double.MAX_VALUE : - Double.MAX_VALUE;
- for (Future<FederatedResponse> fr: ffr){
- double v = isScalar ? ((ScalarObject)fr.get().getData()[0]).getDoubleValue() :
- isMin ? ((MatrixBlock) fr.get().getData()[0]).min() : ((MatrixBlock) fr.get().getData()[0]).max();
- res = isMin ? Math.min(res, v) : Math.max(res, v);
- }
- return new DoubleObject(res);
- }
- catch (Exception ex) {
- throw new DMLRuntimeException(ex);
- }
- }
-
public static MatrixBlock[] getResults(Future<FederatedResponse>[] ffr) {
try {
MatrixBlock[] ret = new MatrixBlock[ffr.length];
@@ -139,32 +125,61 @@
}
}
- public static MatrixBlock rbind(Future<FederatedResponse>[] ffr) {
+ public static MatrixBlock bind(Future<FederatedResponse>[] ffr, boolean cbind) {
// TODO handle non-contiguous cases
try {
MatrixBlock[] tmp = getResults(ffr);
return tmp[0].append(
Arrays.copyOfRange(tmp, 1, tmp.length),
- new MatrixBlock(), false);
+ new MatrixBlock(), cbind);
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
}
+ public static MatrixBlock aggMinMax(Future<FederatedResponse>[] ffr, boolean isMin, boolean isScalar, Optional<FederationMap.FType> fedType) {
+ try {
+ if (!fedType.isPresent() || fedType.get() == FederationMap.FType.OTHER) {
+ double res = isMin ? Double.MAX_VALUE : -Double.MAX_VALUE;
+ for (Future<FederatedResponse> fr : ffr) {
+ double v = isScalar ? ((ScalarObject) fr.get().getData()[0]).getDoubleValue() :
+ isMin ? ((MatrixBlock) fr.get().getData()[0]).min() : ((MatrixBlock) fr.get().getData()[0]).max();
+ res = isMin ? Math.min(res, v) : Math.max(res, v);
+ }
+ return new MatrixBlock(1, 1, res);
+ } else {
+ MatrixBlock[] tmp = getResults(ffr);
+ int dim = fedType.get() == FederationMap.FType.COL ? tmp[0].getNumRows() : tmp[0].getNumColumns();
+
+ for (int i = 0; i < ffr.length - 1; i++)
+ for (int j = 0; j < dim; j++)
+ if (fedType.get() == FederationMap.FType.COL)
+ tmp[i + 1].setValue(j, 0, isMin ? Double.min(tmp[i].getValue(j, 0), tmp[i + 1].getValue(j, 0)) :
+ Double.max(tmp[i].getValue(j, 0), tmp[i + 1].getValue(j, 0)));
+ else tmp[i + 1].setValue(0, j, isMin ? Double.min(tmp[i].getValue(0, j), tmp[i + 1].getValue(0, j)) :
+ Double.max(tmp[i].getValue(0, j), tmp[i + 1].getValue(0, j)));
+ return tmp[ffr.length-1];
+ }
+ }
+ catch (Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+
public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr) {
if(!(aop.aggOp.increOp.fn instanceof KahanFunction || (aop.aggOp.increOp.fn instanceof Builtin &&
- (((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
- ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+ (((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+ ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
throw new DMLRuntimeException("Unsupported aggregation operator: "
- + aop.aggOp.increOp.getClass().getSimpleName());
+ + aop.aggOp.increOp.getClass().getSimpleName());
}
try {
if(aop.aggOp.increOp.fn instanceof Builtin){
// then we know it is a Min or Max based on the previous check.
boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
- return aggMinMax(ffr, isMin, true);
+ return new DoubleObject(aggMinMax(ffr, isMin, true, Optional.empty()).getValue(0,0));
}
else {
double sum = 0; //uak+
@@ -179,23 +194,21 @@
}
public static MatrixBlock aggMatrix(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr, FederationMap map) {
- // handle row aggregate
- if( aop.isRowAggregate() ) {
- //independent of aggregation function for row-partitioned federated matrices
- return rbind(ffr);
- }
- // handle col aggregate
- if( aop.aggOp.increOp.fn instanceof KahanFunction )
+ if (aop.isRowAggregate() && map.getType() == FederationMap.FType.ROW)
+ return bind(ffr, false);
+ else if (aop.isColAggregate() && map.getType() == FederationMap.FType.COL)
+ return bind(ffr, true);
+
+ if (aop.aggOp.increOp.fn instanceof KahanFunction)
return aggAdd(ffr);
else if( aop.aggOp.increOp.fn instanceof Mean )
return aggMean(ffr, map);
else if (aop.aggOp.increOp.fn instanceof Builtin &&
(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
- ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
+ ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)) {
boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
- return new MatrixBlock(1,1,aggMinMax(ffr, isMin, false).getDoubleValue());
- }
- else
+ return aggMinMax(ffr,isMin,false, Optional.of(map.getType()));
+ } else
throw new DMLRuntimeException("Unsupported aggregation operator: "
+ aop.aggOp.increOp.fn.getClass().getSimpleName());
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index c28a163..bdb9784 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -84,7 +84,7 @@
FederatedRequest fr4 = mo1.getFedMapping().cleanup(getTID(), fr1.getID(), fr2.getID());
//execute federated operations and aggregate
Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
- MatrixBlock ret = FederationUtils.rbind(tmp);
+ MatrixBlock ret = FederationUtils.bind(tmp, false);
ec.setMatrixOutput(output.getName(), ret);
}
else { //MM
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index b810d1f..ed9150a 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -19,6 +19,9 @@
package org.apache.sysds.test.functions.federated.algorithms;
+import java.util.Arrays;
+import java.util.Collection;
+
import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.HDFSTool;
@@ -26,20 +29,16 @@
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.util.Arrays;
-import java.util.Collection;
-
@RunWith(value = Parameterized.class)
@net.jcip.annotations.NotThreadSafe
public class FederatedBivarTest extends AutomatedTestBase {
private final static String TEST_DIR = "functions/federated/";
private final static String TEST_NAME = "FederatedBivarTest";
- private final static String TEST_CLASS_DIR = TEST_DIR + FederatedUnivarTest.class.getSimpleName() + "/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + FederatedBivarTest.class.getSimpleName() + "/";
private final static int blocksize = 1024;
@Parameterized.Parameter()
public int rows;
@@ -54,21 +53,15 @@
@Parameterized.Parameters
public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][] {
- {10000, 10},
-// {2000, 50}, {1000, 10},
-// {10000, 10}, {2000, 50}, {1000, 100}
- });
+ return Arrays.asList(new Object[][] {{10000, 16}, {2000, 32}, {1000, 64}, {10000, 128}});
}
@Test
- @Ignore
public void federatedBivarSinglenode() {
federatedL2SVM(Types.ExecMode.SINGLE_NODE);
}
@Test
- @Ignore
public void federatedBivarHybrid() {
federatedL2SVM(Types.ExecMode.HYBRID);
}
@@ -80,27 +73,26 @@
String HOME = SCRIPT_DIR + TEST_DIR;
// write input matrices
- int quarterRows = rows / 4;
- // We have two matrices handled by a single federated worker
- double[][] X1 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 3);
- double[][] X2 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 7);
- double[][] X3 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 8);
- double[][] X4 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 9);
+ int quarterCols = cols / 4;
+
+ double[][] X1 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 3);
+ double[][] X2 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 7);
+ double[][] X3 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 8);
+ double[][] X4 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 9);
+
+ // generate attribute set
+ double[][] S1 = getRandomMatrix(1, (int) cols / 4, 1, cols, 1, 3);
+ TestUtils.floor(S1);
+ double[][] S2 = getRandomMatrix(1, (int) cols / 4, 1, cols, 1, 9);
+ TestUtils.floor(S2);
// write types matrix shape of (1, D)
- double [][] T1 = getRandomMatrix(1, cols, 0, 2, 1, 9);
- Arrays.stream(T1[0]).forEach(n -> Math.ceil(n));
+ double[][] T1 = getRandomMatrix(1, (int) cols / 4, 0, 2, 1, 9);
+ TestUtils.ceil(T1);
+ double[][] T2 = getRandomMatrix(1, (int) cols / 4, 0, 2, 1, 9);
+ TestUtils.ceil(T2);
- double [][] T2 = getRandomMatrix(1, cols, 0, 2, 1, 9);
- Arrays.stream(T2[0]).forEach(n -> Math.ceil(n));
-
- double [][] S1 = getRandomMatrix(1, (int) cols/5, 1, cols, 1, 3);
- Arrays.stream(S1[0]).forEach(n -> Math.ceil(n));
-
- double [][] S2 = getRandomMatrix(1, (int) cols/4, 1, cols, 1, 9);
- Arrays.stream(S2[0]).forEach(n -> Math.ceil(n));
-
- MatrixCharacteristics mc= new MatrixCharacteristics(quarterRows, cols, blocksize, quarterRows * cols);
+ MatrixCharacteristics mc = new MatrixCharacteristics(rows, quarterCols, blocksize, rows * quarterCols);
writeInputMatrixWithMTD("X1", X1, false, mc);
writeInputMatrixWithMTD("X2", X2, false, mc);
writeInputMatrixWithMTD("X3", X3, false, mc);
@@ -116,52 +108,34 @@
int port2 = getRandomAvailablePort();
int port3 = getRandomAvailablePort();
int port4 = getRandomAvailablePort();
- Thread t1 = startLocalFedWorkerThread(port1);
- Thread t2 = startLocalFedWorkerThread(port2);
- Thread t3 = startLocalFedWorkerThread(port3);
- Thread t4 = startLocalFedWorkerThread(port4);
+ Process t1 = startLocalFedWorker(port1);
+ Process t2 = startLocalFedWorker(port2);
+ Process t3 = startLocalFedWorker(port3);
+ Process t4 = startLocalFedWorker(port4);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
-
// Run reference dml script with normal matrix
fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
- programArgs = new String[] {"-stats", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("S1"), input("S2"), input("T1"), input("T2"), expected("B")};
+ programArgs = new String[] {"-stats", "-args", input("X1"), input("X2"), input("X3"), input("X4"), input("S1"),
+ input("S2"), input("T1"), input("T2"), expected("B")};
runTest(true, false, null, -1);
// Run actual dml script with federated matrix
fullDMLScriptName = HOME + TEST_NAME + ".dml";
- programArgs = new String[] {"-stats", "-nvargs",
- "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+ programArgs = new String[] {"-stats", "-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
- "in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
- "in_S1=" + input("S1"),
- "in_S2=" + input("S2"),
- "in_T1=" + input("T1"),
- "in_T2=" + input("T2"),
- "rows=" + rows, "cols=" + cols,
- "out=" + output("B")};
+ "in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "in_S1=" + input("S1"), "in_S2=" + input("S2"),
+ "in_T1=" + input("T1"), "in_T2=" + input("T2"), "rows=" + rows, "cols=" + cols, "out=" + output("B")};
runTest(true, false, null, -1);
// compare via files
-// compareResults(1e-9);
+ compareResults(1e-9);
TestUtils.shutdownThreads(t1, t2, t3, t4);
- // check for federated operations
-// Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
-// Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
-// Assert.assertTrue(heavyHittersContainsString("fed_tsmm"));
-// if( scaleAndShift ) {
-// Assert.assertTrue(heavyHittersContainsString("fed_uacsqk+"));
-// Assert.assertTrue(heavyHittersContainsString("fed_uacmean"));
-// Assert.assertTrue(heavyHittersContainsString("fed_-"));
-// Assert.assertTrue(heavyHittersContainsString("fed_/"));
-// Assert.assertTrue(heavyHittersContainsString("fed_replace"));
-// }
-
- //check that federated input files are still existing
+ // check that federated input files are still existing
Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
new file mode 100644
index 0000000..00f0f50
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullAggregateTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.lops.LopProperties.ExecType;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedFullAggregateTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "FederatedSumTest";
+ private final static String TEST_NAME2 = "FederatedMeanTest";
+ private final static String TEST_NAME3 = "FederatedMaxTest";
+ private final static String TEST_NAME4 = "FederatedMinTest";
+
+ private final static String TEST_DIR = "functions/federated/aggregate/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + FederatedFullAggregateTest.class.getSimpleName() + "/";
+
+ private final static int blocksize = 1024;
+ @Parameterized.Parameter()
+ public int rows;
+ @Parameterized.Parameter(1)
+ public int cols;
+ @Parameterized.Parameter(2)
+ public boolean rowPartitioned;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] {{10, 1000, false}, {100, 4, false}, {36, 1000, true}, {1000, 10, true}, {4, 100, true}});
+ }
+
+ private enum OpType {
+ SUM, MEAN, MAX, MIN
+ }
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S.scalar"}));
+ addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S.scalar"}));
+ addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S.scalar"}));
+ addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S.scalar"}));
+ }
+
+ @Test
+ public void testSumDenseMatrixCP() {
+ runColAggregateOperationTest(OpType.SUM, ExecType.CP);
+ }
+
+ @Test
+ public void testMeanDenseMatrixCP() {
+ runColAggregateOperationTest(OpType.MEAN, ExecType.CP);
+ }
+
+ @Test
+ public void testMaxDenseMatrixCP() {
+ runColAggregateOperationTest(OpType.MAX, ExecType.CP);
+ }
+
+ @Test
+ public void testMinDenseMatrixCP() {
+ runColAggregateOperationTest(OpType.MIN, ExecType.CP);
+ }
+
+ @Test
+ public void testSumDenseMatrixSP() {
+ runColAggregateOperationTest(OpType.SUM, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMeanDenseMatrixSP() {
+ runColAggregateOperationTest(OpType.MEAN, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMaxDenseMatrixSP() {
+ runColAggregateOperationTest(OpType.MAX, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMinDenseMatrixSP() {
+ runColAggregateOperationTest(OpType.MIN, ExecType.SPARK);
+ }
+
+ private void runColAggregateOperationTest(OpType type, ExecType instType) {
+ ExecMode platformOld = rtplatform;
+ switch(instType) {
+ case SPARK:
+ rtplatform = ExecMode.SPARK;
+ break;
+ default:
+ rtplatform = ExecMode.HYBRID;
+ break;
+ }
+
+ if(rtplatform == ExecMode.SPARK)
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ String TEST_NAME = null;
+ switch(type) {
+ case SUM:
+ TEST_NAME = TEST_NAME1;
+ break;
+ case MEAN:
+ TEST_NAME = TEST_NAME2;
+ break;
+ case MAX:
+ TEST_NAME = TEST_NAME3;
+ break;
+ case MIN:
+ TEST_NAME = TEST_NAME4;
+ break;
+ }
+
+ getAndLoadTestConfiguration(TEST_NAME);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+
+ // write input matrices
+ int r = rows;
+ int c = cols / 4;
+ if(rowPartitioned) {
+ r = rows / 4;
+ c = cols;
+ }
+
+ double[][] X1 = getRandomMatrix(r, c, 1, 5, 1, 3);
+ double[][] X2 = getRandomMatrix(r, c, 1, 5, 1, 7);
+ double[][] X3 = getRandomMatrix(r, c, 1, 5, 1, 8);
+ double[][] X4 = getRandomMatrix(r, c, 1, 5, 1, 9);
+
+ MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
+ writeInputMatrixWithMTD("X1", X1, false, mc);
+ writeInputMatrixWithMTD("X2", X2, false, mc);
+ writeInputMatrixWithMTD("X3", X3, false, mc);
+ writeInputMatrixWithMTD("X4", X4, false, mc);
+
+ // empty script name because we don't execute any script, just start the worker
+ fullDMLScriptName = "";
+ int port1 = getRandomAvailablePort();
+ int port2 = getRandomAvailablePort();
+ int port3 = getRandomAvailablePort();
+ int port4 = getRandomAvailablePort();
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
+ Thread t3 = startLocalFedWorkerThread(port3);
+ Thread t4 = startLocalFedWorkerThread(port4);
+
+ TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+ loadTestConfiguration(config);
+
+ // Run reference dml script with normal matrix
+ fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+ programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
+ expected("S"), Boolean.toString(rowPartitioned).toUpperCase()};
+ runTest(true, false, null, -1);
+
+ // Run actual dml script with federated matrix
+
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "100", "-nvargs",
+ "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+ "in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+ "in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+ "in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
+ "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
+
+ runTest(true, false, null, -1);
+
+ // compare via files
+ compareResults(1e-9);
+
+ switch(type) {
+ case SUM:
+ Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+ break;
+ case MEAN:
+ Assert.assertTrue(heavyHittersContainsString("fed_uamean"));
+ break;
+ case MAX:
+ Assert.assertTrue(heavyHittersContainsString("fed_uamax"));
+ break;
+ case MIN:
+ Assert.assertTrue(heavyHittersContainsString("fed_uamin"));
+ break;
+ }
+
+ // check that federated input files are still existing
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+ TestUtils.shutdownThreads(t1, t2, t3, t4);
+ resetExecMode(platformOld);
+
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
new file mode 100644
index 0000000..b1c3991
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRowColAggregateTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedRowColAggregateTest extends AutomatedTestBase {
+ private final static String TEST_NAME1 = "FederatedColSumTest";
+ private final static String TEST_NAME2 = "FederatedColMeanTest";
+ private final static String TEST_NAME3 = "FederatedColMaxTest";
+ private final static String TEST_NAME4 = "FederatedColMinTest";
+ private final static String TEST_NAME5 = "FederatedRowSumTest";
+ private final static String TEST_NAME6 = "FederatedRowMeanTest";
+ private final static String TEST_NAME7 = "FederatedRowMaxTest";
+ private final static String TEST_NAME8 = "FederatedRowMinTest";
+
+ private final static String TEST_DIR = "functions/federated/aggregate/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRowColAggregateTest.class.getSimpleName() + "/";
+
+ private final static int blocksize = 1024;
+ @Parameterized.Parameter()
+ public int rows;
+ @Parameterized.Parameter(1)
+ public int cols;
+ @Parameterized.Parameter(2)
+ public boolean rowPartitioned;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] {{10, 1000, false},
+ //{100, 4, false}, {36, 1000, true}, {1000, 10, true}, {4, 100, true}
+ });
+ }
+
+ private enum OpType {
+ SUM, MEAN, MAX, MIN
+ }
+
+ private enum InstType {
+ ROW, COL
+ }
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME8, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME8, new String[] {"S"}));
+ }
+
+ @Test
+ public void testColSumDenseMatrixCP() {
+ runAggregateOperationTest(OpType.SUM, InstType.COL, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testColMeanDenseMatrixCP() {
+ runAggregateOperationTest(OpType.MEAN, InstType.COL, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testColMaxDenseMatrixCP() {
+ runAggregateOperationTest(OpType.MAX, InstType.COL, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testRowSumDenseMatrixCP() {
+ runAggregateOperationTest(OpType.SUM, InstType.ROW, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testRowMeanDenseMatrixCP() {
+ runAggregateOperationTest(OpType.MEAN, InstType.ROW, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testRowMaxDenseMatrixCP() {
+ runAggregateOperationTest(OpType.MAX, InstType.ROW, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testRowMinDenseMatrixCP() {
+ runAggregateOperationTest(OpType.MIN, InstType.ROW, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testColMinDenseMatrixCP() {
+ runAggregateOperationTest(OpType.MIN, InstType.COL, ExecMode.SINGLE_NODE);
+ }
+
+ private void runAggregateOperationTest(OpType type, InstType instr, ExecMode execMode) {
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ ExecMode platformOld = rtplatform;
+
+ if(rtplatform == ExecMode.SPARK)
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ String TEST_NAME = null;
+ switch(type) {
+ case SUM:
+ TEST_NAME = instr == InstType.COL ? TEST_NAME1 : TEST_NAME5;
+ break;
+ case MEAN:
+ TEST_NAME = instr == InstType.COL ? TEST_NAME2 : TEST_NAME6;
+ break;
+ case MAX:
+ TEST_NAME = instr == InstType.COL ? TEST_NAME3 : TEST_NAME7;
+ break;
+ case MIN:
+ TEST_NAME = instr == InstType.COL ? TEST_NAME4 : TEST_NAME8;
+ break;
+ }
+
+ getAndLoadTestConfiguration(TEST_NAME);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+
+ // write input matrices
+ int r = rows;
+ int c = cols / 4;
+ if(rowPartitioned) {
+ r = rows / 4;
+ c = cols;
+ }
+
+ double[][] X1 = getRandomMatrix(r, c, 1, 5, 1, 3);
+ double[][] X2 = getRandomMatrix(r, c, 1, 5, 1, 7);
+ double[][] X3 = getRandomMatrix(r, c, 1, 5, 1, 8);
+ double[][] X4 = getRandomMatrix(r, c, 1, 5, 1, 9);
+
+ MatrixCharacteristics mc = new MatrixCharacteristics(r, c, blocksize, r * c);
+ writeInputMatrixWithMTD("X1", X1, false, mc);
+ writeInputMatrixWithMTD("X2", X2, false, mc);
+ writeInputMatrixWithMTD("X3", X3, false, mc);
+ writeInputMatrixWithMTD("X4", X4, false, mc);
+
+ // empty script name because we don't execute any script, just start the worker
+ fullDMLScriptName = "";
+ int port1 = getRandomAvailablePort();
+ int port2 = getRandomAvailablePort();
+ int port3 = getRandomAvailablePort();
+ int port4 = getRandomAvailablePort();
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
+ Thread t3 = startLocalFedWorkerThread(port3);
+ Thread t4 = startLocalFedWorkerThread(port4);
+
+ rtplatform = execMode;
+ if(rtplatform == ExecMode.SPARK) {
+ System.out.println(7);
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+ }
+ TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+ loadTestConfiguration(config);
+
+ // Run reference dml script with normal matrix
+ fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+ programArgs = new String[] {"-stats", "100", "-args", input("X1"), input("X2"), input("X3"), input("X4"),
+ expected("S"), Boolean.toString(rowPartitioned).toUpperCase()};
+ runTest(true, false, null, -1);
+
+ // Run actual dml script with federated matrix
+
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "100", "-nvargs",
+ "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+ "in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+ "in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+ "in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols,
+ "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")};
+
+ runTest(true, false, null, -1);
+
+ // compare via files
+ compareResults(1e-9);
+
+ String fedInst = instr == InstType.COL ? "fed_uac" : "fed_uar";
+
+ switch(type) {
+ case SUM:
+ Assert.assertTrue(heavyHittersContainsString(fedInst.concat("k+")));
+ break;
+ case MEAN:
+ Assert.assertTrue(heavyHittersContainsString(fedInst.concat("mean")));
+ break;
+ case MAX:
+ Assert.assertTrue(heavyHittersContainsString(fedInst.concat("max")));
+ break;
+ case MIN:
+ Assert.assertTrue(heavyHittersContainsString(fedInst.concat("min")));
+ break;
+ }
+
+ // check that federated input files are still existing
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+ TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+ rtplatform = platformOld;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+
+ }
+}
diff --git a/src/test/scripts/functions/federated/FederatedBivarTest.dml b/src/test/scripts/functions/federated/FederatedBivarTest.dml
index 94b197a..9a0ba4a 100644
--- a/src/test/scripts/functions/federated/FederatedBivarTest.dml
+++ b/src/test/scripts/functions/federated/FederatedBivarTest.dml
@@ -20,12 +20,10 @@
#-------------------------------------------------------------
X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
- ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
- list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2), list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
S1 = read($in_S1);
S2 = read($in_S2);
T1 = read($in_T1);
T2 = read($in_T2);
-B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2, verbose=FALSE);
write(B, $out);
-
diff --git a/src/test/scripts/functions/federated/FederatedBivarTestReference.dml b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
index b28347c..565b941 100644
--- a/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
+++ b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
@@ -19,10 +19,10 @@
#
#-------------------------------------------------------------
-X = rbind(read($1), read($2), read($3), read($4));
+X = cbind(read($1), read($2), read($3), read($4));
S1 = read($5);
S2 = read($6);
T1 = read($7);
T2 = read($8);
-B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
-write(B, $9);
\ No newline at end of file
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2, verbose=FALSE);
+write(B, $9);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMaxTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTest.dml
new file mode 100644
index 0000000..85e2ce6
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colMaxs(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMaxTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTestReference.dml
new file mode 100644
index 0000000..7a51647
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMaxTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colMaxs(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMeanTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTest.dml
new file mode 100644
index 0000000..f455b62
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colMeans(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMeanTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTestReference.dml
new file mode 100644
index 0000000..934fee5
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMeanTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colMeans(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMinTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMinTest.dml
new file mode 100644
index 0000000..db19f0e
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMinTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colMins(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColMinTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColMinTestReference.dml
new file mode 100644
index 0000000..ba3c967
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColMinTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colMins(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColSumTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedColSumTest.dml
new file mode 100644
index 0000000..d5cf9b8
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColSumTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = colSums(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedColSumTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedColSumTestReference.dml
new file mode 100644
index 0000000..aba56e8
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedColSumTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = colSums(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMaxTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedMaxTest.dml
new file mode 100644
index 0000000..c5c209f
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMaxTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = max(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMaxTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedMaxTestReference.dml
new file mode 100644
index 0000000..887cd3c
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMaxTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = max(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMeanTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedMeanTest.dml
new file mode 100644
index 0000000..22d3f0b
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMeanTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = mean(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMeanTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedMeanTestReference.dml
new file mode 100644
index 0000000..8566399
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMeanTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = mean(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMinTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedMinTest.dml
new file mode 100644
index 0000000..990cd3a
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMinTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = min(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedMinTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedMinTestReference.dml
new file mode 100644
index 0000000..353fa4d
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedMinTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = min(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTest.dml
new file mode 100644
index 0000000..76af48a
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowMaxs(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTestReference.dml
new file mode 100644
index 0000000..485ad13
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMaxTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowMaxs(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTest.dml
new file mode 100644
index 0000000..1dac255
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowMeans(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTestReference.dml
new file mode 100644
index 0000000..fd21323
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMeanTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowMeans(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMinTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTest.dml
new file mode 100644
index 0000000..11932df
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowMins(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowMinTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTestReference.dml
new file mode 100644
index 0000000..b4b050a
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowMinTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowMins(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowSumTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTest.dml
new file mode 100644
index 0000000..2f39039
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = rowSums(A);
+
+write(s, $out_S);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedRowSumTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTestReference.dml
new file mode 100644
index 0000000..1dab2a5
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedRowSumTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = rowSums(A);
+
+write(s, $5);
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedSumTest.dml b/src/test/scripts/functions/federated/aggregate/FederatedSumTest.dml
new file mode 100644
index 0000000..9de439e
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedSumTest.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+s = sum(A);
+
+write(s, $out_S);
\ No newline at end of file
diff --git a/src/test/scripts/functions/federated/aggregate/FederatedSumTestReference.dml b/src/test/scripts/functions/federated/aggregate/FederatedSumTestReference.dml
new file mode 100644
index 0000000..d7d3540
--- /dev/null
+++ b/src/test/scripts/functions/federated/aggregate/FederatedSumTestReference.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = sum(A);
+
+write(s, $5);