[SYSTEMDS-2721] Add Federated SSL

This commit adds SSL encryption for Federated Transfer of java objects.
This initial design use a self signed certificate, that is fundamentally
inssecure, because the certificate is implicitly trusted per default.

Another task is added to use signed certificates, but for the purpose of
encrypted transfer experiments this is sufficient.
diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java b/src/main/java/org/apache/sysds/api/DMLScript.java
index d32731f..dfe6825 100644
--- a/src/main/java/org/apache/sysds/api/DMLScript.java
+++ b/src/main/java/org/apache/sysds/api/DMLScript.java
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.security.cert.CertificateException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -78,7 +79,6 @@
 import org.apache.sysds.utils.Explain.ExplainCounts;
 import org.apache.sysds.utils.Explain.ExplainType;
 
-
 public class DMLScript 
 {
 	private static ExecMode   EXEC_MODE          = DMLOptions.defaultOptions.execMode;     // the execution mode
@@ -237,12 +237,17 @@
 				return true;
 			}
 			
-			if (dmlOptions.fedWorker) {
+			if(dmlOptions.fedWorker) {
 				loadConfiguration(fnameOptConfig);
-				new FederatedWorker(dmlOptions.fedWorkerPort).run();
+				try {
+					new FederatedWorker(dmlOptions.fedWorkerPort).run();
+				}
+				catch(CertificateException e) {
+					e.printStackTrace();
+				}
 				return true;
 			}
-			
+
 			LineageCacheConfig.setConfig(LINEAGE_REUSE);
 			LineageCacheConfig.setCachePolicy(LINEAGE_POLICY);
 
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 9c1b65a..49c83ee 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -93,6 +93,7 @@
 	public static final String PRINT_GPU_MEMORY_INFO = "sysds.gpu.print.memoryInfo";
 	public static final String EVICTION_SHADOW_BUFFERSIZE = "sysds.gpu.eviction.shadow.bufferSize";
 
+	public static final String USE_SSL_FEDERATED_COMMUNICATION = "sysds.federated.ssl"; // boolean
 	public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed default Spark Port
 	public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2;
 	
@@ -138,6 +139,7 @@
 		_defaultVals.put(SYNCHRONIZE_GPU,        "false" );
 		_defaultVals.put(EAGER_CUDA_FREE,        "false" );
 		_defaultVals.put(FLOATING_POINT_PRECISION, "double" );
+		_defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
 	}
 	
 	public DMLConfig() {
@@ -385,7 +387,8 @@
 			CODEGEN, CODEGEN_API, CODEGEN_COMPILER, CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS,
 			STATS_MAX_WRAP_LEN, PRINT_GPU_MEMORY_INFO,
 			AVAILABLE_GPUS, SYNCHRONIZE_GPU, EAGER_CUDA_FREE, FLOATING_POINT_PRECISION, GPU_EVICTION_POLICY, 
-			LOCAL_SPARK_NUM_THREADS, EVICTION_SHADOW_BUFFERSIZE, GPU_MEMORY_ALLOCATOR, GPU_MEMORY_UTILIZATION_FACTOR
+			LOCAL_SPARK_NUM_THREADS, EVICTION_SHADOW_BUFFERSIZE, GPU_MEMORY_ALLOCATOR, GPU_MEMORY_UTILIZATION_FACTOR,
+			USE_SSL_FEDERATED_COMMUNICATION
 		}; 
 		
 		StringBuilder sb = new StringBuilder();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index 5a40456..f9702c0 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -26,9 +26,12 @@
 import java.util.Set;
 import java.util.concurrent.Future;
 
+import javax.net.ssl.SSLException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
@@ -38,6 +41,7 @@
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -45,13 +49,15 @@
 import io.netty.handler.codec.serialization.ClassResolvers;
 import io.netty.handler.codec.serialization.ObjectDecoder;
 import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.util.concurrent.Promise;
 
-
 public class FederatedData {
 	private static final Log LOG = LogFactory.getLog(FederatedData.class.getName());
 	private static final Set<InetSocketAddress> _allFedSites = new HashSet<>();
-	
+
 	private final Types.DataType _dataType;
 	private final InetSocketAddress _address;
 	private final String _filepath;
@@ -71,34 +77,34 @@
 	public InetSocketAddress getAddress() {
 		return _address;
 	}
-	
+
 	public void setVarID(long varID) {
 		_varID = varID;
 	}
-	
+
 	public long getVarID() {
 		return _varID;
 	}
-	
+
 	public String getFilepath() {
 		return _filepath;
 	}
 
-	public Types.DataType getDataType(){
+	public Types.DataType getDataType() {
 		return _dataType;
 	}
-	
+
 	public boolean isInitialized() {
 		return _varID != -1;
 	}
-	
+
 	boolean equalAddress(FederatedData that) {
-		return _address != null && that != null && that._address != null 
-			&& _address.equals(that._address);
+		return _address != null && that != null && that._address != null && _address.equals(that._address);
 	}
-	
+
 	/**
 	 * Make a copy of the <code>FederatedData</code> metadata, but use another varID (refer to another object on worker)
+	 * 
 	 * @param varID the varID of the variable we refer to
 	 * @return new <code>FederatedData</code> with different varID set
 	 */
@@ -107,7 +113,7 @@
 		copy.setVarID(varID);
 		return copy;
 	}
-	
+
 	public synchronized Future<FederatedResponse> initFederatedData(long id) {
 		if(isInitialized())
 			throw new DMLRuntimeException("Tried to init already initialized data");
@@ -119,61 +125,80 @@
 		request.appendParam(_dataType.name());
 		return executeFederatedOperation(request);
 	}
-	
+
 	public synchronized Future<FederatedResponse> executeFederatedOperation(FederatedRequest... request) {
-		return executeFederatedOperation(_address, request);
+
+		try {
+			return executeFederatedOperation(_address, request);
+		}
+		catch(SSLException e) {
+			throw new DMLRuntimeException("Error in SSL Connection", e);
+		}
 	}
-	
+
 	/**
 	 * Executes an federated operation on a federated worker.
 	 *
 	 * @param address socket address (incl host and port)
 	 * @param request the requested operation
 	 * @return the response
+	 * @throws SSLException Throws an SSL exception if the ssl construction fails.
 	 */
-	public static Future<FederatedResponse> executeFederatedOperation(InetSocketAddress address, FederatedRequest... request) {
-		// Careful with the number of threads. Each thread opens connections to multiple files making resulting in 
+	public static Future<FederatedResponse> executeFederatedOperation(InetSocketAddress address,
+		FederatedRequest... request) throws SSLException {
+		// Careful with the number of threads. Each thread opens connections to multiple files making resulting in
 		// java.io.IOException: Too many open files
 		EventLoopGroup workerGroup = new NioEventLoopGroup(DMLConfig.DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS);
+		final SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
+			.build();
 		try {
 			Bootstrap b = new Bootstrap();
 			final DataRequestHandler handler = new DataRequestHandler(workerGroup);
+			// Client Netty
 			b.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
 				@Override
-				public void initChannel(SocketChannel ch) {
-					ch.pipeline().addLast("ObjectDecoder",
-						new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
-						.addLast("FederatedOperationHandler", handler)
-						.addLast("ObjectEncoder", new ObjectEncoder());
+				protected void initChannel(SocketChannel ch) throws Exception {
+					ChannelPipeline cp = ch.pipeline();
+					if(ConfigurationManager.getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION)) {
+						cp.addLast(
+							sslCtx.newHandler(ch.alloc(), address.getAddress().getHostAddress(), address.getPort()));
+					}
+
+					cp.addLast("ObjectDecoder",
+						new ObjectDecoder(Integer.MAX_VALUE,
+							ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())));
+					cp.addLast("FederatedOperationHandler", handler);
+					cp.addLast("ObjectEncoder", new ObjectEncoder());
+
 				}
 			});
-			
+
 			ChannelFuture f = b.connect(address).sync();
 			Promise<FederatedResponse> promise = f.channel().eventLoop().newPromise();
 			handler.setPromise(promise);
 			f.channel().writeAndFlush(request);
 			return promise;
 		}
-		catch (InterruptedException e) {
+		catch(InterruptedException e) {
 			throw new DMLRuntimeException("Could not send federated operation.");
 		}
-		catch (Exception e) {
+		catch(Exception e) {
 			throw new DMLRuntimeException(e);
 		}
 	}
-	
+
 	public static void clearFederatedWorkers() {
-		if( _allFedSites.isEmpty() )
+		if(_allFedSites.isEmpty())
 			return;
-		
+
 		try {
-			//create and execute clear request on all workers
+			// create and execute clear request on all workers
 			FederatedRequest fr = new FederatedRequest(RequestType.CLEAR);
 			List<Future<FederatedResponse>> ret = new ArrayList<>();
-			for( InetSocketAddress address : _allFedSites )
+			for(InetSocketAddress address : _allFedSites)
 				ret.add(executeFederatedOperation(address, fr));
-			
-			//wait for successful completion
+
+			// wait for successful completion
 			FederationUtils.waitFor(ret);
 		}
 		catch(Exception ex) {
@@ -183,26 +208,26 @@
 			resetFederatedSites();
 		}
 	}
-	
+
 	public static void resetFederatedSites() {
 		_allFedSites.clear();
 	}
-	
+
 	private static class DataRequestHandler extends ChannelInboundHandlerAdapter {
 		private Promise<FederatedResponse> _prom;
 		private EventLoopGroup _workerGroup;
-		
+
 		public DataRequestHandler(EventLoopGroup workerGroup) {
 			_workerGroup = workerGroup;
 		}
-		
+
 		public void setPromise(Promise<FederatedResponse> prom) {
 			_prom = prom;
 		}
-		
+
 		@Override
 		public void channelRead(ChannelHandlerContext ctx, Object msg) {
-			if (_prom == null)
+			if(_prom == null)
 				throw new DMLRuntimeException("Read while no message was sent");
 			_prom.setSuccess((FederatedResponse) msg);
 			ctx.close();
@@ -211,11 +236,11 @@
 	}
 
 	@Override
-	public String toString(){
+	public String toString() {
 		StringBuilder sb = new StringBuilder();
 		sb.append(this.getClass().getSimpleName().toString());
-		sb.append(" "+ _dataType);
-		sb.append(" "+_address.toString());
+		sb.append(" " + _dataType);
+		sb.append(" " + _address.toString());
 		sb.append(":" + _filepath);
 		return sb.toString();
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index 726198a..7c44593 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -19,10 +19,19 @@
 
 package org.apache.sysds.runtime.controlprogram.federated;
 
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.SSLException;
+
+import org.apache.log4j.Logger;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.conf.DMLConfig;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -30,44 +39,54 @@
 import io.netty.handler.codec.serialization.ClassResolvers;
 import io.netty.handler.codec.serialization.ObjectDecoder;
 import io.netty.handler.codec.serialization.ObjectEncoder;
-import org.apache.log4j.Logger;
-import org.apache.sysds.conf.DMLConfig;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
 
 public class FederatedWorker {
 	protected static Logger log = Logger.getLogger(FederatedWorker.class);
 
 	private int _port;
 	private final ExecutionContextMap _ecm;
-	
+
 	public FederatedWorker(int port) {
 		_ecm = new ExecutionContextMap();
 		_port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port;
 	}
 
-	public void run() {
+	public void run() throws CertificateException, SSLException {
 		log.info("Setting up Federated Worker");
 		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 		EventLoopGroup workerGroup = new NioEventLoopGroup(1);
 		ServerBootstrap b = new ServerBootstrap();
-		b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-			.childHandler(new ChannelInitializer<SocketChannel>() {
-				@Override
-				public void initChannel(SocketChannel ch) {
-					ch.pipeline()
-						.addLast("ObjectDecoder",
-							new ObjectDecoder(Integer.MAX_VALUE,
-								ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
-						.addLast("ObjectEncoder", new ObjectEncoder())
-						.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_ecm));
-				}
-			}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+		// TODO add ability to use real ssl files, not self signed certificates.
+		SelfSignedCertificate cert = new SelfSignedCertificate();
+		final SslContext cont2 = SslContextBuilder.forServer(cert.certificate(), cert.privateKey()).build();
+
 		try {
+			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+				.childHandler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					public void initChannel(SocketChannel ch) {
+						ChannelPipeline cp = ch.pipeline();
+
+						if(ConfigurationManager.getDMLConfig()
+							.getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION)) {
+							cp.addLast(cont2.newHandler(ch.alloc()));
+						}
+						cp.addLast("ObjectDecoder",
+							new ObjectDecoder(Integer.MAX_VALUE,
+								ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())));
+						cp.addLast("ObjectEncoder", new ObjectEncoder());
+						cp.addLast("FederatedWorkerHandler", new FederatedWorkerHandler(_ecm));
+					}
+				}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
 			log.info("Starting Federated Worker server at port: " + _port);
 			ChannelFuture f = b.bind(_port).sync();
 			log.info("Started Federated Worker at port: " + _port);
 			f.channel().closeFuture().sync();
 		}
-		catch (InterruptedException e) {
+		catch(Exception e) {
 			log.info("Federated worker interrupted");
 		}
 		finally {
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index 8fa2063..d4dc464 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sysds.test.functions.federated.io;
 
+
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -60,7 +61,7 @@
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
 		// number of rows or cols has to be >= number of federated locations.
-		return Arrays.asList(new Object[][] {{10, 13, true, 2},});
+		return Arrays.asList(new Object[][] {{10, 13, true, 2}});
 	}
 
 	@Test
@@ -99,13 +100,14 @@
 			// Run reference dml script with normal matrix
 			fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + TEST_NAME + (rowPartitioned ? "Row" : "Col")
 				+ "Reference.dml";
-			programArgs = new String[] {"-args", input("X1"), input("X2")};
+			programArgs = new String[] {"-stats", "-args", input("X1"), input("X2")};
 			String refOut = runTest(null).toString();
+			
 			// Run federated
 			fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + TEST_NAME + ".dml";
 			programArgs = new String[] {"-stats", "-args", input("X.json")};
 			String out = runTest(null).toString();
-			// LOG.error(out);
+
 			Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
 			// Verify output
 			Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]),
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
new file mode 100644
index 0000000..40029f6
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.functions.federated.io;
+
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.test.functions.federated.FederatedTestObjectConstructor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedSSLTest extends AutomatedTestBase {
+
+	// private static final Log LOG = LogFactory.getLog(FederatedReaderTest.class.getName());
+	// This test use the same scripts as the Federated Reader tests, just with SSL enabled.
+	private final static String TEST_DIR = "functions/federated/io/";
+	private final static String TEST_NAME = "FederatedReaderTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedReaderTest.class.getSimpleName() + "/";
+	private final static int blocksize = 1024;
+	private final static File TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR + "SSLConfig.xml");
+
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+	@Parameterized.Parameter(2)
+	public boolean rowPartitioned;
+	@Parameterized.Parameter(3)
+	public int fedCount;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		// number of rows or cols has to be >= number of federated locations.
+		return Arrays.asList(new Object[][] {{10, 13, true, 2}});
+	}
+
+	@Test
+	public void federatedSinglenodeRead() {
+		federatedRead(Types.ExecMode.SINGLE_NODE);
+	}
+
+	public void federatedRead(Types.ExecMode execMode) {
+		Types.ExecMode oldPlatform = setExecMode(execMode);
+		getAndLoadTestConfiguration(TEST_NAME);
+		setOutputBuffering(true);
+		
+		// write input matrices
+		int halfRows = rows / 2;
+		long[][] begins = new long[][] {new long[] {0, 0}, new long[] {halfRows, 0}};
+		long[][] ends = new long[][] {new long[] {halfRows, cols}, new long[] {rows, cols}};
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
+		double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
+		writeInputMatrixWithMTD("X1", X1, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("X2", X2, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1);
+		Thread t2 = startLocalFedWorkerThread(port2);
+		String host = "localhost";
+
+		MatrixObject fed = FederatedTestObjectConstructor.constructFederatedInput(
+			rows, cols, blocksize, host, begins, ends, new int[] {port1, port2},
+			new String[] {input("X1"), input("X2")}, input("X.json"));
+		writeInputFederatedWithMTD("X.json", fed, null);
+
+		try {
+			// Run reference dml script with normal matrix
+			fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + TEST_NAME + (rowPartitioned ? "Row" : "Col")
+				+ "Reference.dml";
+			programArgs = new String[] {"-stats", "-args", input("X1"), input("X2")};
+			String refOut = runTest(null).toString();
+			
+			// Run federated
+			fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + TEST_NAME + ".dml";
+			programArgs = new String[] {"-stats", "-args", input("X.json")};
+			String out = runTest(null).toString();
+
+			Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+			// Verify output
+			Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]),
+				Double.parseDouble(out.split("\n")[0]), 0.00001);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			Assert.assertTrue(false);
+		}
+		finally {
+			resetExecMode(oldPlatform);
+		}
+
+		TestUtils.shutdownThreads(t1, t2);
+	}
+
+	/**
+	 * Override default configuration with custom test configuration to ensure
+	 * scratch space and local temporary directory locations are also updated.
+	 */
+	@Override
+	protected File getConfigTemplateFile() {
+		// Instrumentation in this test's output log to show custom configuration file used for template.
+		System.out.println("This test case overrides default configuration with " + TEST_CONF_FILE.getPath());
+		return TEST_CONF_FILE;
+	}
+}
diff --git a/src/test/scripts/functions/federated/io/SSLConfig.xml b/src/test/scripts/functions/federated/io/SSLConfig.xml
new file mode 100644
index 0000000..f375ba3
--- /dev/null
+++ b/src/test/scripts/functions/federated/io/SSLConfig.xml
@@ -0,0 +1,21 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<root>
+    <sysds.federated.ssl>true</sysds.federated.ssl>
+</root>
\ No newline at end of file