[SYSTEMDS-2549,2631] Federated min, max and Binary elm-w Upgrade

SYSTEMDS-2549 Federated Binary Element-wise operations
- Supports now aligned federated matrices both MV and MM

The min and max operations are total min max, not column or row based.

This commit also contains tests for Federated L2SVM with
distributed labels.
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 9a14aba..027c577 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -19,17 +19,20 @@
 
 package org.apache.sysds.runtime.controlprogram.federated;
 
-
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Future;
 
+import org.apache.log4j.Logger;
 import org.apache.sysds.common.Types.ExecType;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
+import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.functionobjects.Mean;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -43,6 +46,7 @@
 import org.apache.sysds.runtime.matrix.operators.SimpleOperator;
 
 public class FederationUtils {
+	protected static Logger log = Logger.getLogger(FederationUtils.class);
 	private static final IDSequence _idSeq = new IDSequence();
 	
 	public static void resetFedDataID() {
@@ -132,16 +136,31 @@
 	}
 
 	public static ScalarObject aggScalar(AggregateUnaryOperator aop, Future<FederatedResponse>[] ffr) {
-		if( !(aop.aggOp.increOp.fn instanceof KahanFunction) ) {
+		if(!(aop.aggOp.increOp.fn instanceof KahanPlus || (aop.aggOp.increOp.fn instanceof Builtin &&
+			(((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+				((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
 			throw new DMLRuntimeException("Unsupported aggregation operator: "
 				+ aop.aggOp.increOp.getClass().getSimpleName());
 		}
-		//compute scalar sum of partial aggregates
+
 		try {
-			double sum = 0; //uak+, uasqk+
-			for( Future<FederatedResponse> fr : ffr )
-				sum += ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
-			return new DoubleObject(sum);
+			if(aop.aggOp.increOp.fn instanceof Builtin){
+				// then we know it is a Min or Max based on the previous check.
+				boolean isMin = ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
+				double res = isMin ? Double.MAX_VALUE: - Double.MAX_VALUE;
+				double v;
+				for (Future<FederatedResponse> fr: ffr){
+					v = ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
+					res = isMin ? Math.min(res, v) : Math.max(res, v);
+				}
+				return new DoubleObject(res);
+			} 
+			else {		
+				double sum = 0; //uak+
+				for( Future<FederatedResponse> fr : ffr )
+					sum += ((ScalarObject)fr.get().getData()[0]).getDoubleValue();
+				return new DoubleObject(sum);
+			}
 		}
 		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java
index 7166373..63c2d71 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryMatrixMatrixFEDInstruction.java
@@ -39,29 +39,41 @@
 		MatrixObject mo1 = ec.getMatrixObject(input1);
 		MatrixObject mo2 = ec.getMatrixObject(input2);
 		
+		FederatedRequest fr2 = null;
+
 		if( mo2.isFederated() ) {
-			throw new DMLRuntimeException("Matrix-matrix binary operations "
-				+ " with a federated right input are not supported yet.");
+			if(mo1.isFederated() && mo1.getFedMapping().isAligned(mo2.getFedMapping(), false)){
+				fr2 = FederationUtils.callInstruction(instString, output, new CPOperand[]{input1, input2},
+					new long[]{mo1.getFedMapping().getID(), mo2.getFedMapping().getID()});
+				mo1.getFedMapping().execute(getTID(), true, fr2);
+				
+			} else{
+				throw new DMLRuntimeException("Matrix-matrix binary operations "
+					+ " with a federated right input are not supported yet.");
+			}
+
+		} 
+		else {
+			//matrix-matrix binary oFederatedRequest fr2 = null;perations -> lhs fed input -> fed output
+			
+			if(mo2.getNumRows() > 1 && mo2.getNumColumns() == 1 ) { //MV row vector
+				FederatedRequest[] fr1 = mo1.getFedMapping().broadcastSliced(mo2, false);
+				fr2 = FederationUtils.callInstruction(instString, output, new CPOperand[]{input1, input2},
+					new long[]{mo1.getFedMapping().getID(), fr1[0].getID()});
+				//execute federated instruction and cleanup intermediates
+				mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+				mo1.getFedMapping().cleanup(getTID(), fr1[0].getID());
+			}
+			else { //MM or MV col vector
+				FederatedRequest fr1 = mo1.getFedMapping().broadcast(mo2);
+				fr2 = FederationUtils.callInstruction(instString, output, new CPOperand[]{input1, input2},
+					new long[]{mo1.getFedMapping().getID(), fr1.getID()});
+				//execute federated instruction and cleanup intermediates
+				mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+				mo1.getFedMapping().cleanup(getTID(), fr1.getID());
+			}
 		}
 		
-		//matrix-matrix binary operations -> lhs fed input -> fed output
-		FederatedRequest fr2 = null;
-		if(mo2.getNumRows() > 1 && mo2.getNumColumns() == 1 ) { //MV row vector
-			FederatedRequest[] fr1 = mo1.getFedMapping().broadcastSliced(mo2, false);
-			fr2 = FederationUtils.callInstruction(instString, output, new CPOperand[]{input1, input2},
-				new long[]{mo1.getFedMapping().getID(), fr1[0].getID()});
-			//execute federated instruction and cleanup intermediates
-			mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
-			mo1.getFedMapping().cleanup(getTID(), fr1[0].getID());
-		}
-		else { //MM or MV col vector
-			FederatedRequest fr1 = mo1.getFedMapping().broadcast(mo2);
-			fr2 = FederationUtils.callInstruction(instString, output, new CPOperand[]{input1, input2},
-				new long[]{mo1.getFedMapping().getID(), fr1.getID()});
-			//execute federated instruction and cleanup intermediates
-			mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
-			mo1.getFedMapping().cleanup(getTID(), fr1.getID());
-		}
 		
 		//derive new fed mapping for output
 		MatrixObject out = ec.getMatrixObject(output);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java
new file mode 100644
index 0000000..bb8ae52
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedBinaryMatrixTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedBinaryMatrixTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedBinaryMatrixTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		// rows have to be even and > 1
+		return Arrays.asList(new Object[][] {
+            {2, 1000}, 
+            {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}
+        });
+	}
+
+	@Test
+	public void federatedMultiplyCP() {
+		federatedMultiply(Types.ExecMode.SINGLE_NODE);
+	}
+
+	/*
+	 * FIXME spark execution mode support
+	 * 
+	 * @Test public void federatedMultiplySP() { federatedMultiply(Types.ExecMode.SPARK); }
+	 */
+
+	public void federatedMultiply(Types.ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int halfRows = rows / 2;
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
+		double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
+		// And another two matrices handled by a single federated worker
+		double[][] Y1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 44);
+		double[][] Y2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 21);
+
+		writeInputMatrixWithMTD("X1", X1, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("X2", X2, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("Y1", Y1, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("Y2", Y2, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorker(port1);
+		Thread t2 = startLocalFedWorker(port2);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-nvargs", "X1=" + input("X1"), "X2=" + input("X2"), "Y1=" + input("Y1"),
+			"Y2=" + input("Y2"), "Z=" + expected("Z")};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrix
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-nvargs", "X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"Y1=" + TestUtils.federatedAddress(port1, input("Y1")),
+			"Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "r=" + rows, "c=" + cols, "Z=" + output("Z")};
+		runTest(true, false, null, -1);
+
+		// compare via files
+		compareResults(1e-9);
+
+		TestUtils.shutdownThreads(t1, t2);
+
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java
new file mode 100644
index 0000000..0016331
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedBinaryVectorTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/";
+	// Using same test base as binary matrix test
+	private final static String TEST_NAME = "FederatedBinaryMatrixTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedBinaryVectorTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		// rows have to be even and > 1
+		return Arrays.asList(new Object[][] {
+            {2, 1000}, 
+            {10, 100}, {100, 10}, {1000, 1}, {10, 2000}, {2000, 10}
+        });
+	}
+
+	@Test
+	public void federatedMultiplyCP() {
+		federatedMultiply(Types.ExecMode.SINGLE_NODE);
+	}
+
+	/*
+	 * FIXME spark execution mode support
+	 * 
+	 * @Test public void federatedMultiplySP() { federatedMultiply(Types.ExecMode.SPARK); }
+	 */
+
+	public void federatedMultiply(Types.ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int halfRows = rows / 2;
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
+		double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
+		// And another two matrices handled by a single federated worker
+		double[][] Y1 = getRandomMatrix(halfRows, 1, 0, 1, 1, 44);
+		double[][] Y2 = getRandomMatrix(halfRows, 1, 0, 1, 1, 21);
+
+		writeInputMatrixWithMTD("X1", X1, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("X2", X2, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("Y1", Y1, false, new MatrixCharacteristics(halfRows, 1, blocksize, halfRows));
+		writeInputMatrixWithMTD("Y2", Y2, false, new MatrixCharacteristics(halfRows, 1, blocksize, halfRows));
+
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorker(port1);
+		Thread t2 = startLocalFedWorker(port2);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-nvargs", "X1=" + input("X1"), "X2=" + input("X2"), "Y1=" + input("Y1"),
+			"Y2=" + input("Y2"), "Z=" + expected("Z")};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrix
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-nvargs", "X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"Y1=" + TestUtils.federatedAddress(port1, input("Y1")),
+			"Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "r=" + rows, "c=" + cols, "Z=" + output("Z")};
+		runTest(true, false, null, -1);
+
+		// compare via files
+		compareResults(1e-9);
+
+		TestUtils.shutdownThreads(t1, t2);
+
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java
new file mode 100644
index 0000000..5a52bb6
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedLogRegTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedLogRegTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedGLMTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		// rows have to be even and > 1
+		return Arrays.asList(new Object[][] {{10000, 10}, {1000, 100}, {2000, 43}});
+	}
+
+	@Test
+	public void federatedSinglenodeGLM() {
+		federatedGLM(Types.ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void federatedHybridGLM() {
+		federatedGLM(Types.ExecMode.HYBRID);
+	}
+
+	
+	public void federatedGLM(Types.ExecMode execMode) {
+		ExecMode platformOld = setExecMode(execMode);
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int halfRows = rows / 2;
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
+		double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
+		double[][] Y = getRandomMatrix(rows, 1, -1, 1, 1, 1233);
+		for(int i = 0; i < rows; i++)
+			Y[i][0] = (Y[i][0] > 0) ? 1 : -1;
+
+		writeInputMatrixWithMTD("X1", X1, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("X2", X2, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("Y", Y, false, new MatrixCharacteristics(rows, 1, blocksize, rows));
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorker(port1);
+		Thread t2 = startLocalFedWorker(port2);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+		setOutputBuffering(false);
+		
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-args", input("X1"), input("X2"), input("Y"), expected("Z")};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrix
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats",
+			"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")), "rows=" + rows, "cols=" + cols,
+			"in_Y=" + input("Y"), "out=" + output("Z")};
+		runTest(true, false, null, -1);
+
+		// compare via files
+		compareResults(1e-9);
+
+		TestUtils.shutdownThreads(t1, t2);
+
+		// check for federated operations
+		Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
+		Assert.assertTrue(heavyHittersContainsString("fed_uark+","fed_uarsqk+"));
+		Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
+		Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+		Assert.assertTrue(heavyHittersContainsString("fed_mmchain"));
+		
+		//check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+		
+		resetExecMode(platformOld);
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java
new file mode 100644
index 0000000..27323d8
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedYL2SVMTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedYL2SVMTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedYL2SVMTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		// rows have to be even and > 1
+		return Arrays.asList(new Object[][] {
+			// {2, 1000}, {10, 100}, {100, 10}, {1000, 1}, {10, 2000},
+			 {2000, 10}});
+	}
+
+	@Test
+	public void federatedL2SVMCP() {
+		federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+	}
+
+	/*
+	 * TODO support SPARK execution mode -> RDDs and SPARK instructions lead to quite a few problems
+	 * 
+	 * @Test public void federatedL2SVMSP() { federatedL2SVM(Types.ExecMode.SPARK); }
+	 */
+
+	public void federatedL2SVM(Types.ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		Thread t1, t2;
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int halfRows = rows / 2;
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
+		double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
+		double[][] Y1 = getRandomMatrix(halfRows, 1, -1, 1, 1, 1233);
+		double[][] Y2 = getRandomMatrix(halfRows, 1, -1, 1, 1, 13);
+
+		for(int i = 0; i < halfRows; i++) {
+			Y1[i][0] = (Y1[i][0] > 0) ? 1 : -1;
+			Y2[i][0] = (Y2[i][0] > 0) ? 1 : -1;
+		}
+
+		writeInputMatrixWithMTD("X1", X1, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("X2", X2, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("Y1", Y1, false, new MatrixCharacteristics(halfRows, 1, blocksize, halfRows));
+		writeInputMatrixWithMTD("Y2", Y2, false, new MatrixCharacteristics(halfRows, 1, blocksize, halfRows));
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		t1 = startLocalFedWorker(port1);
+		t2 = startLocalFedWorker(port2);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+		setOutputBuffering(false);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-args", input("X1"), input("X2"), input("Y1"), input("Y2"), expected("Z")};
+		runTest(true, false, null, -1);
+
+		// Run actual dml script with federated matrixz
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")), "rows=" + rows, "cols=" + cols,
+			"in_Y1=" + TestUtils.federatedAddress(port1, input("Y1")),
+			"in_Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "out=" + output("Z")};
+		runTest(true, false, null, -1);
+
+		// compare via files
+		compareResults(1e-9);
+
+		TestUtils.shutdownThreads(t1, t2);
+
+		rtplatform = platformOld;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+	}
+}
diff --git a/src/test/scripts/functions/federated/FederatedBinaryMatrixTest.dml b/src/test/scripts/functions/federated/FederatedBinaryMatrixTest.dml
new file mode 100644
index 0000000..34b71b7
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBinaryMatrixTest.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = federated(addresses=list($X1, $X2),
+    ranges=list(list(0, 0), list($r / 2, $c), list($r / 2, 0), list($r, $c)))
+Y = federated(addresses=list($Y1, $Y2),
+    ranges=list(list(0, 0), list($r / 2, $c), list($r / 2, 0), list($r, $c)))
+Z = X + Y
+write(Z, $Z)
diff --git a/src/test/scripts/functions/federated/FederatedBinaryMatrixTestReference.dml b/src/test/scripts/functions/federated/FederatedBinaryMatrixTestReference.dml
new file mode 100644
index 0000000..4fdd7c5
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBinaryMatrixTestReference.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rbind(read($X1), read($X2))
+Y = rbind(read($Y1), read($Y2))
+Z = X + Y
+write(Z, $Z)
diff --git a/src/test/scripts/functions/federated/FederatedLogRegTest.dml b/src/test/scripts/functions/federated/FederatedLogRegTest.dml
new file mode 100644
index 0000000..063d630
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLogRegTest.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = federated(addresses=list($in_X1, $in_X2),
+    ranges=list(list(0, 0), list($rows / 2, $cols), list($rows / 2, 0), list($rows, $cols)))
+Y = read($in_Y)
+model = multiLogReg(X=X,  Y=Y, tol=1e-5, maxi=30)
+write(model, $out)
diff --git a/src/test/scripts/functions/federated/FederatedLogRegTestReference.dml b/src/test/scripts/functions/federated/FederatedLogRegTestReference.dml
new file mode 100644
index 0000000..a1f75b5
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLogRegTestReference.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rbind(read($1), read($2))
+Y = read($3)
+model = multiLogReg(X=X,  Y=Y, tol=1e-5, maxi=30)
+write(model, $4)
diff --git a/src/test/scripts/functions/federated/FederatedYL2SVMTest.dml b/src/test/scripts/functions/federated/FederatedYL2SVMTest.dml
new file mode 100644
index 0000000..b1f5fdc
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedYL2SVMTest.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = federated(addresses=list($in_X1, $in_X2),
+    ranges=list(list(0, 0), list($rows / 2, $cols), list($rows / 2, 0), list($rows, $cols)))
+Y = federated(addresses=list($in_Y1, $in_Y2),
+    ranges=list(list(0, 0), list($rows / 2, 1), list($rows / 2, 0), list($rows, 1)))
+model = l2svm(X=X,  Y=Y, intercept = FALSE, epsilon = 1e-12, lambda = 1, maxIterations = 100)
+write(model, $out)
diff --git a/src/test/scripts/functions/federated/FederatedYL2SVMTestReference.dml b/src/test/scripts/functions/federated/FederatedYL2SVMTestReference.dml
new file mode 100644
index 0000000..c17a655
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedYL2SVMTestReference.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rbind(read($1), read($2))
+Y = rbind(read($3), read($4))
+model = l2svm(X=X,  Y=Y, intercept = FALSE, epsilon = 1e-12, lambda = 1, maxIterations = 100)
+write(model, $5)