[SYSTEMDS-2638] Federated stats for Worker

Federated workers now are able to output stats of execution, if the -stats
flag is set. The stats are printed at every clear command sendt to the
worker. Each time the stats are printed the stats are also reset.

This commit also change the test setup to worker now in separated JVM in
test frame work. To enable this the federated worker startup wait time
have been increased to 1 sec from 0.75 sec
diff --git a/.github/workflows/functionsTests.yml b/.github/workflows/functionsTests.yml
index 63fd85e..7d0349a 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -54,7 +54,8 @@
           data.rand,
           data.tensor,
           dnn,
-          federated,
+          federated.algorithms,
+          federated.primitives,
           frame,
           indexing,
           io,
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 0dcb846..b09b75d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -19,14 +19,14 @@
 
 package org.apache.sysds.runtime.controlprogram.federated;
 
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Arrays;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
@@ -52,11 +52,13 @@
 import org.apache.sysds.runtime.privacy.PrivacyMonitor;
 import org.apache.sysds.runtime.privacy.PrivacyPropagator;
 import org.apache.sysds.utils.JSONHelper;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONObject;
 
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.util.Arrays;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 
 public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 	protected static Logger log = Logger.getLogger(FederatedWorkerHandler.class);
@@ -110,6 +112,11 @@
 			else if( response == null && i == requests.length-1 ) {
 				response = tmp; //return last
 			}
+
+			if (DMLScript.STATISTICS && request.getType() == RequestType.CLEAR){
+				System.out.println("Federated Worker " + Statistics.display());
+				Statistics.reset();
+			}
 		}
 		ctx.writeAndFlush(response).addListener(new CloseListener());
 	}
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java b/src/main/java/org/apache/sysds/utils/Statistics.java
index b498b0e..8f22ab4 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -501,6 +501,11 @@
 		nativeConv2dBwdFilterTime = 0;
 		nativeConv2dBwdDataTime = 0;
 		LibMatrixDNN.resetStatistics();
+		federatedReadCount.reset();
+		federatedPutCount.reset();
+		federatedGetCount.reset();
+		federatedExecuteInstructionCount.reset();
+		federatedExecuteUDFCount.reset();
 	}
 
 	public static void resetJITCompileTime(){
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index bfb2e5d..7bca262 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysds.test;
 
+import static java.lang.Thread.sleep;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -100,7 +101,7 @@
 	public static final boolean TEST_GPU = false;
 	public static final double GPU_TOLERANCE = 1e-9;
 
-	public static final int FED_WORKER_WAIT = 750; // in ms
+	public static final int FED_WORKER_WAIT = 1000; // in ms
 
 	// With OpenJDK 8u242 on Windows, the new changes in JDK are not allowing
 	// to set the native library paths internally thus breaking the code.
@@ -1283,38 +1284,40 @@
 		}
 	}
 
-	protected Thread startLocalFedWorker(int port) {
-		Thread t = null;
-		String[] fedWorkArgs = {"-w", Integer.toString(port)};
-		ArrayList<String> args = new ArrayList<>();
+	/**
+	 * Start new JVM for a federated worker at the port.
+	 * 
+	 * 
+	 * @param port Port to use for the JVM
+	 * @return the process associated with the worker.
+	 */
+	protected Process startLocalFedWorker(int port) {
+		Process process = null;
+		String separator = System.getProperty("file.separator");
+		String classpath = System.getProperty("java.class.path");
+		String path = System.getProperty("java.home")
+					+ separator + "bin" + separator + "java";
+		ProcessBuilder processBuilder = new ProcessBuilder(path, "-cp", 
+				classpath, DMLScript.class.getName(), "-w",  Integer.toString(port), "-stats");
 
-		addProgramIndependentArguments(args);
-
-		for(int i = 0; i < fedWorkArgs.length; i++)
-			args.add(fedWorkArgs[i]);
-
-		String[] finalArguments = args.toArray(new String[args.size()]);
-
-		try {
-			t = new Thread(() -> {
-				try {
-					main(finalArguments);
-				}
-				catch(IOException e) {
-				}
-			});
-			t.start();
-			java.util.concurrent.TimeUnit.MILLISECONDS.sleep(FED_WORKER_WAIT);
+		try{
+			process = processBuilder.start();
+			// Give some time to startup the worker.
+			sleep(FED_WORKER_WAIT);
+		} catch (IOException | InterruptedException e){
+			e.printStackTrace();
 		}
-		catch(InterruptedException e) {
-			// Should happen at closing of the worker so don't print
-		}
-		return t;
+		return process;
 	}
 
+	/**
+	 * Start java worker in same JVM.
+	 * @param args the command line arguments
+	 * @return the thread associated with the process.s
+	 */
 	public static Thread startLocalFedWorkerWithArgs(String[] args) {
 		Thread t = null;
-
+		
 		try {
 			t = new Thread(() -> {
 				try {
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java
index d5aea1c..1e38e2c 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -2469,6 +2469,11 @@
 		for( Thread t : ts )
 			shutdownThread(t);
 	}
+
+	public static void shutdownThreads(Process... ts) {
+		for( Process t : ts )
+			shutdownThread(t);
+	}
 	
 	public static void shutdownThread(Thread t) {
 		// kill the worker
@@ -2482,6 +2487,19 @@
 			}
 		}
 	}
+
+	public static void shutdownThread(Process t) {
+		// kill the worker
+		if( t != null ) {
+			Process d = t.destroyForcibly();
+			try {
+				d.waitFor();
+			}
+			catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+	}
 	
 	public static String federatedAddress(int port, String input) {
 		return federatedAddress("localhost", port, input);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index 906beec..96fef5d 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -116,10 +116,10 @@
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
-		Thread t3 = startLocalFedWorker(port3);
-		Thread t4 = startLocalFedWorker(port4);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+		Process t3 = startLocalFedWorker(port3);
+		Process t4 = startLocalFedWorker(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedGLMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedGLMTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
index 009d921..636b279 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedGLMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -94,8 +94,8 @@
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedKmeansTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedKmeansTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
index 7b60476..933e971 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedKmeansTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -101,8 +101,8 @@
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
similarity index 98%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedL2SVMTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
index 4cfc70e..4caf52e 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -76,7 +76,7 @@
 		if(rtplatform == Types.ExecMode.SPARK) {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
-		Thread t1, t2;
+		Process t1, t2;
 
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
similarity index 75%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
index 1c07bfc..fef8889 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedLogRegTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
@@ -17,12 +17,13 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -30,9 +31,10 @@
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-
-import java.util.Arrays;
-import java.util.Collection;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 @RunWith(value = Parameterized.class)
 @net.jcip.annotations.NotThreadSafe
@@ -61,17 +63,16 @@
 	}
 
 	@Test
-	public void federatedSinglenodeGLM() {
-		federatedGLM(Types.ExecMode.SINGLE_NODE);
+	public void federatedSinglenodeLogReg() {
+		federatedLogReg(Types.ExecMode.SINGLE_NODE);
 	}
 
 	@Test
-	public void federatedHybridGLM() {
-		federatedGLM(Types.ExecMode.HYBRID);
+	public void federatedHybridLogReg() {
+		federatedLogReg(Types.ExecMode.HYBRID);
 	}
 
-	
-	public void federatedGLM(Types.ExecMode execMode) {
+	public void federatedLogReg(Types.ExecMode execMode) {
 		ExecMode platformOld = setExecMode(execMode);
 
 		getAndLoadTestConfiguration(TEST_NAME);
@@ -94,13 +95,25 @@
 		fullDMLScriptName = "";
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+
+		BufferedReader output = new BufferedReader(new InputStreamReader(t1.getInputStream()));
+		BufferedReader error = new BufferedReader(new InputStreamReader(t1.getInputStream()));
+
+		Thread t = new Thread(() -> {
+			output.lines().forEach(s -> System.out.println(s));
+		});
+		Thread te = new Thread(() -> {
+			error.lines().forEach(s -> System.err.println(s));
+		});
+		t.start();
+		te.start();
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
 		setOutputBuffering(false);
-		
+
 		// Run reference dml script with normal matrix
 		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
 		programArgs = new String[] {"-args", input("X1"), input("X2"), input("Y"), expected("Z")};
@@ -108,8 +121,8 @@
 
 		// Run actual dml script with federated matrix
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
-		programArgs = new String[] {"-stats", "30",
-			"-nvargs", "in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+		programArgs = new String[] {"-stats", "30", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
 			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")), "rows=" + rows, "cols=" + cols,
 			"in_Y=" + input("Y"), "out=" + output("Z")};
 		runTest(true, false, null, -1);
@@ -118,16 +131,19 @@
 		compareResults(1e-9);
 
 		TestUtils.shutdownThreads(t1, t2);
+		TestUtils.shutdownThreads(t, te);
 
 		// check for federated operations
-		Assert.assertTrue("contains federated matrix mult",heavyHittersContainsString("fed_ba+*"));
-		Assert.assertTrue("contains federated row unary aggregate",heavyHittersContainsString("fed_uark+","fed_uarsqk+"));
-		Assert.assertTrue("contains federated matrix mult chain or transpose",heavyHittersContainsString("fed_mmchain", "fed_r'"));
-		
-		//check that federated input files are still existing
+		Assert.assertTrue("contains federated matrix mult", heavyHittersContainsString("fed_ba+*"));
+		Assert.assertTrue("contains federated row unary aggregate",
+			heavyHittersContainsString("fed_uark+", "fed_uarsqk+"));
+		Assert.assertTrue("contains federated matrix mult chain or transpose",
+			heavyHittersContainsString("fed_mmchain", "fed_r'"));
+
+		// check that federated input files are still existing
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
 		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
-		
+
 		resetExecMode(platformOld);
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedPCATest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
similarity index 95%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedPCATest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
index 4b4457a..7a5a2fd 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedPCATest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -101,10 +101,10 @@
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
-		Thread t3 = startLocalFedWorker(port3);
-		Thread t4 = startLocalFedWorker(port4);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+		Process t3 = startLocalFedWorker(port3);
+		Process t4 = startLocalFedWorker(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
similarity index 95%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
index a704d3a..c042cbb 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -101,10 +101,10 @@
 		int port2 = getRandomAvailablePort();
 		int port3 = getRandomAvailablePort();
 		int port4 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
-		Thread t3 = startLocalFedWorker(port3);
-		Thread t4 = startLocalFedWorker(port4);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
+		Process t3 = startLocalFedWorker(port3);
+		Process t4 = startLocalFedWorker(port4);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
similarity index 98%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
index 27323d8..ebdee88 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedYL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.algorithms;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -78,7 +78,7 @@
 		if(rtplatform == Types.ExecMode.SPARK) {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
-		Thread t1, t2;
+		Process t1, t2;
 
 		getAndLoadTestConfiguration(TEST_NAME);
 		String HOME = SCRIPT_DIR + TEST_DIR;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
index bb8ae52..052234d 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -99,8 +99,8 @@
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
index 0016331..187648b 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBinaryVectorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -100,8 +100,8 @@
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
index 8125bfe..74180be 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
@@ -120,7 +120,7 @@
 		String HOME = SCRIPT_DIR + TEST_DIR;
 
 		int port = getRandomAvailablePort();
-		Thread t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMatrixScalarOperationsTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
similarity index 98%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedMatrixScalarOperationsTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
index f67bd9b..becb246 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMatrixScalarOperationsTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMatrixScalarOperationsTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runners.Parameterized;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
similarity index 96%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
index 7968dd7..c75b0b5 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedMultiplyTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -96,8 +96,8 @@
 
 		int port1 = getRandomAvailablePort();
 		int port2 = getRandomAvailablePort();
-		Thread t1 = startLocalFedWorker(port1);
-		Thread t2 = startLocalFedWorker(port2);
+		Process t1 = startLocalFedWorker(port1);
+		Process t2 = startLocalFedWorker(port2);
 
 		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
 		loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
index 8c60cec..5cac52c 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.controlprogram.federated.*;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index 0f28a7b..e59eea8 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -79,7 +79,7 @@
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols));
 
 		int port = getRandomAvailablePort();
-		Thread t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
similarity index 97%
rename from src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
index 69a743c..c6d4be6 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/FederatedSumTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.federated;
+package org.apache.sysds.test.functions.federated.primitives;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -78,7 +78,7 @@
 		double[][] A = getRandomMatrix(rows / 2, cols, -10, 10, 1, 1);
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows / 2, cols, blocksize, (rows / 2) * cols));
 		int port = getRandomAvailablePort();
-		Thread t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
diff --git a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java
index 6ecb5ba..b3180cc 100644
--- a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedL2SVMTest.java
@@ -314,7 +314,7 @@
 		if(rtplatform == Types.ExecMode.SPARK) {
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
-		Thread t1 = null, t2 = null;
+		Process t1 = null, t2 = null;
 
 		try {
 			getAndLoadTestConfiguration(TEST_NAME);
diff --git a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java
index 3c2cbd6..6c7ce4b 100644
--- a/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/privacy/FederatedWorkerHandlerTest.java
@@ -154,7 +154,6 @@
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
 
-		Thread t;
 
 		getAndLoadTestConfiguration("aggregation");
 		String HOME = SCRIPT_DIR + TEST_DIR;
@@ -162,7 +161,7 @@
 		double[][] A = getRandomMatrix(rows, cols, -10, 10, 1, 1);
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols), new PrivacyConstraint(privacyLevel));
 		int port = getRandomAvailablePort();
-		t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
@@ -224,7 +223,6 @@
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		Types.ExecMode platformOld = rtplatform;
 
-		Thread t;
 
 		getAndLoadTestConfiguration("transfer");
 		String HOME = SCRIPT_DIR + TEST_DIR;
@@ -233,7 +231,7 @@
 		writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols), new PrivacyConstraint(privacyLevel));
 
 		int port = getRandomAvailablePort();
-		t = startLocalFedWorker(port);
+		Process t = startLocalFedWorker(port);
 
 		// we need the reference file to not be written to hdfs, so we get the correct format
 		rtplatform = Types.ExecMode.SINGLE_NODE;
@@ -289,7 +287,7 @@
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		}
 
-		Thread t1, t2;
+		Process t1, t2;
 
 		getAndLoadTestConfiguration("matvecmult");
 		String HOME = SCRIPT_DIR + TEST_DIR;
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java b/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
index 1f9c87d..6a5653e 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
@@ -87,7 +87,7 @@
 		ExecMode platformOld = rtplatform;
 		rtplatform = ExecMode.SINGLE_NODE;
 
-		Thread t1 = null, t2 = null, t3 = null, t4 = null;
+		Process t1 = null, t2 = null, t3 = null, t4 = null;
 		try {
 			getAndLoadTestConfiguration(TEST_NAME1);