[SYSTEMDS-2736] Federated Ternary aggregate

This commit adds Federated Ternary Aggregation, as well as change the
federated cleanup, to run in separated threads. The latter improve the
system performance by synchronizing and cleaning workers in parallel with
computation continuing.

Also a minior syntax error is corrected in builtin l2svm.

closes #1110
diff --git a/scripts/builtin/l2svm.dml b/scripts/builtin/l2svm.dml
index 775db95..9e0bf2a 100644
--- a/scripts/builtin/l2svm.dml
+++ b/scripts/builtin/l2svm.dml
@@ -135,7 +135,7 @@
 
     if(verbose) {
       colstr = ifelse(columnId!=-1, ", Col:"+columnId + " ,", " ,")
-      print("Iter:" + toString(iter) + "InnerIter:" + toString(iiter) +" --- "+ colstr + " Obj:" + obj)
+      print("Iter: " + toString(iter) + " InnerIter: " + toString(iiter) +" --- "+ colstr + " Obj:" + obj)
     }
 
     tmp = sum(s * g_old)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index 2ce3cb7..76521db 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -231,8 +231,14 @@
 		List<Future<FederatedResponse>> tmp = new ArrayList<>();
 		for(FederatedData fd : _fedMap.values())
 			tmp.add(fd.executeFederatedOperation(request));
-		// wait to avoid interference w/ following requests
-		FederationUtils.waitFor(tmp);
+		// This cleaning is allowed to go in a separate thread, and finish on its own.
+		// The benefit is that the program is able to continue working on other things.
+		// The downside is that at the end of execution these threads can have executed
+		// for some extra time that can in particular be noticeable for shorter federated jobs.
+
+		// To force the cleanup use waitFor -> drastically increasing execution time if
+		// communication is slow to federated sites.
+		// FederationUtils.waitFor(tmp);
 	}
 
 	private static FederatedRequest[] addAll(FederatedRequest a, FederatedRequest[] b) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateTernaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateTernaryFEDInstruction.java
new file mode 100644
index 0000000..2bc187e
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateTernaryFEDInstruction.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.concurrent.Future;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.instructions.cp.AggregateTernaryCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+
+public class AggregateTernaryFEDInstruction extends FEDInstruction {
+	// private static final Log LOG = LogFactory.getLog(AggregateTernaryFEDInstruction.class.getName());
+
+	public final AggregateTernaryCPInstruction _ins;
+
+	protected AggregateTernaryFEDInstruction(AggregateTernaryCPInstruction ins) {
+		super(FEDType.AggregateTernary, ins.getOperator(), ins.getOpcode(), ins.getInstructionString());
+		_ins = ins;
+	}
+
+	public static AggregateTernaryFEDInstruction parseInstruction(AggregateTernaryCPInstruction ins) {
+		return new AggregateTernaryFEDInstruction(ins);
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		MatrixObject mo1 = ec.getMatrixObject(_ins.input1);
+		MatrixObject mo2 = ec.getMatrixObject(_ins.input2);
+		MatrixObject mo3 = _ins.input3.isLiteral() ? null : ec.getMatrixObject(_ins.input3);
+
+		if(mo1.isFederated() && mo2.isFederated() && mo1.getFedMapping().isAligned(mo2.getFedMapping(), false) &&
+			mo3 == null) {
+			FederatedRequest fr1 = mo1.getFedMapping().broadcast(ec.getScalarInput(_ins.input3));
+			FederatedRequest fr2 = FederationUtils.callInstruction(_ins.getInstructionString(),
+				_ins.getOutput(),
+				new CPOperand[] {_ins.input1, _ins.input2, _ins.input3},
+				new long[] {mo1.getFedMapping().getID(), mo2.getFedMapping().getID(), fr1.getID()});
+			FederatedRequest fr3 = new FederatedRequest(RequestType.GET_VAR, fr2.getID());
+			FederatedRequest fr4 = mo2.getFedMapping().cleanup(getTID(), fr1.getID(), fr2.getID());
+			Future<FederatedResponse>[] tmp = mo1.getFedMapping().execute(getTID(), fr1, fr2, fr3, fr4);
+
+			if(_ins.output.getDataType().isScalar()) {
+				double sum = 0;
+				for(Future<FederatedResponse> fr : tmp)
+					try {
+						sum += ((ScalarObject) fr.get().getData()[0]).getDoubleValue();
+					}
+					catch(Exception e) {
+						throw new DMLRuntimeException("Federated Get data failed with exception on TernaryFedInstruction", e);
+					}
+
+				ec.setScalarOutput(_ins.output.getName(), new DoubleObject(sum));
+			}
+			else {
+				throw new DMLRuntimeException("Not Implemented Federated Ternary Variation");
+			}
+		}
+		else {
+			if(mo3 == null)
+				throw new DMLRuntimeException("Federated AggregateTernary not supported with the "
+					+ "following federated objects: " + mo1.isFederated() + ":" + mo1.getFedMapping() + " "
+					+ mo2.isFederated() + ":" + mo2.getFedMapping());
+			else
+				throw new DMLRuntimeException("Federated AggregateTernary not supported with the "
+					+ "following federated objects: " + mo1.isFederated() + ":" + mo1.getFedMapping() + " "
+					+ mo2.isFederated() + ":" + mo2.getFedMapping() + mo3.isFederated() + ":" + mo3.getFedMapping());
+		}
+
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
index 0d912d8..10b6147 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
@@ -29,6 +29,7 @@
 	public enum FEDType {
 		AggregateBinary,
 		AggregateUnary,
+		AggregateTernary,
 		Append,
 		Binary,
 		Init,
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index c0481ab..b44aa54 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -28,6 +28,7 @@
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap.FType;
 import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.cp.AggregateBinaryCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.AggregateTernaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.BinaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.Data;
@@ -158,7 +159,6 @@
 		}
 		else if(inst instanceof VariableCPInstruction ){
 			VariableCPInstruction ins = (VariableCPInstruction) inst;
-
 			if(ins.getVariableOpcode() == VariableOperationCode.Write 
 				&& ins.getInput1().isMatrix()
 				&& ins.getInput3().getName().contains("federated")){
@@ -175,6 +175,13 @@
 				fedinst = VariableFEDInstruction.parseInstruction(ins);
 			}
 		}
+		else if(inst instanceof AggregateTernaryCPInstruction){
+			AggregateTernaryCPInstruction ins = (AggregateTernaryCPInstruction) inst;
+			if(ins.input1.isMatrix() && ec.getCacheableData(ins.input1).isFederated() && ins.input2.isMatrix() &&
+				ec.getCacheableData(ins.input2).isFederated()) {
+				fedinst = AggregateTernaryFEDInstruction.parseInstruction(ins);
+			}
+		}
 
 		//set thread id for federated context management
 		if( fedinst != null ) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
index 1f152e9..3dae771 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
@@ -60,13 +60,14 @@
 public class InitFEDInstruction extends FEDInstruction {
 
 	private static final Log LOG = LogFactory.getLog(InitFEDInstruction.class.getName());
-	
+
 	public static final String FED_MATRIX_IDENTIFIER = "matrix";
 	public static final String FED_FRAME_IDENTIFIER = "frame";
 
 	private CPOperand _type, _addresses, _ranges, _output;
 
-	public InitFEDInstruction(CPOperand type, CPOperand addresses, CPOperand ranges, CPOperand out, String opcode, String instr) {
+	public InitFEDInstruction(CPOperand type, CPOperand addresses, CPOperand ranges, CPOperand out, String opcode,
+		String instr) {
 		super(FEDType.Init, opcode, instr);
 		_type = type;
 		_addresses = addresses;
@@ -78,7 +79,7 @@
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
 		// We need 5 parts: Opcode, Type (Frame/Matrix), Addresses (list of Strings with
 		// url/ip:port/filepath), ranges and the output Operand
-		if (parts.length != 5)
+		if(parts.length != 5)
 			throw new DMLRuntimeException("Invalid number of operands in federated instruction: " + str);
 		String opcode = parts[0];
 
@@ -97,22 +98,22 @@
 		ListObject ranges = ec.getListObject(_ranges.getName());
 		List<Pair<FederatedRange, FederatedData>> feds = new ArrayList<>();
 
-		if (addresses.getLength() * 2 != ranges.getLength())
+		if(addresses.getLength() * 2 != ranges.getLength())
 			throw new DMLRuntimeException("Federated read needs twice the amount of addresses as ranges "
 				+ "(begin and end): addresses=" + addresses.getLength() + " ranges=" + ranges.getLength());
-		
+
 		Types.DataType fedDataType;
-		if (type.equalsIgnoreCase(FED_MATRIX_IDENTIFIER))
+		if(type.equalsIgnoreCase(FED_MATRIX_IDENTIFIER))
 			fedDataType = Types.DataType.MATRIX;
-		else if (type.equalsIgnoreCase(FED_FRAME_IDENTIFIER))
+		else if(type.equalsIgnoreCase(FED_FRAME_IDENTIFIER))
 			fedDataType = Types.DataType.FRAME;
 		else
 			throw new DMLRuntimeException("type \"" + type + "\" non valid federated type");
-		
-		long[] usedDims = new long[] { 0, 0 };
-		for (int i = 0; i < addresses.getLength(); i++) {
+
+		long[] usedDims = new long[] {0, 0};
+		for(int i = 0; i < addresses.getLength(); i++) {
 			Data addressData = addresses.getData().get(i);
-			if (addressData instanceof StringObject) {
+			if(addressData instanceof StringObject) {
 				// We split address into url/ip, the port and file path of file to read
 				String[] parsedValues = parseURL(((StringObject) addressData).getStringValue());
 				String host = parsedValues[0];
@@ -122,7 +123,7 @@
 				List<Data> rangesData = ranges.getData();
 				Data beginData = rangesData.get(i * 2);
 				Data endData = rangesData.get(i * 2 + 1);
-				if (beginData.getDataType() != Types.DataType.LIST || endData.getDataType() != Types.DataType.LIST)
+				if(beginData.getDataType() != Types.DataType.LIST || endData.getDataType() != Types.DataType.LIST)
 					throw new DMLRuntimeException(
 						"Federated read ranges (lower, upper) have to be lists of dimensions");
 				List<Data> beginDimsData = ((ListObject) beginData).getData();
@@ -131,7 +132,7 @@
 				// fill begin and end dims
 				long[] beginDims = new long[beginDimsData.size()];
 				long[] endDims = new long[beginDims.length];
-				for (int d = 0; d < beginDims.length; d++) {
+				for(int d = 0; d < beginDims.length; d++) {
 					beginDims[d] = ((ScalarObject) beginDimsData.get(d)).getLongValue();
 					endDims[d] = ((ScalarObject) endDimsData.get(d)).getLongValue();
 				}
@@ -142,7 +143,7 @@
 						new InetSocketAddress(InetAddress.getByName(host), port), filePath);
 					feds.add(new ImmutablePair<>(new FederatedRange(beginDims, endDims), federatedData));
 				}
-				catch (UnknownHostException e) {
+				catch(UnknownHostException e) {
 					throw new DMLRuntimeException("federated host was unknown: " + host);
 				}
 			}
@@ -150,15 +151,15 @@
 				throw new DMLRuntimeException("federated instruction only takes strings as addresses");
 			}
 		}
-		if (type.equalsIgnoreCase(FED_MATRIX_IDENTIFIER)) {
+		if(type.equalsIgnoreCase(FED_MATRIX_IDENTIFIER)) {
 			CacheableData<?> output = ec.getCacheableData(_output);
 			output.getDataCharacteristics().setRows(usedDims[0]).setCols(usedDims[1]);
 			federateMatrix(output, feds);
 		}
-		else if (type.equalsIgnoreCase(FED_FRAME_IDENTIFIER)) {
-			if (usedDims[1] > Integer.MAX_VALUE)
-				throw new DMLRuntimeException("federated Frame can not have more than max int columns, because the " +
-						"schema can only be max int length");
+		else if(type.equalsIgnoreCase(FED_FRAME_IDENTIFIER)) {
+			if(usedDims[1] > Integer.MAX_VALUE)
+				throw new DMLRuntimeException("federated Frame can not have more than max int columns, because the "
+					+ "schema can only be max int length");
 			FrameObject output = ec.getFrameObject(_output);
 			output.getDataCharacteristics().setRows(usedDims[0]).setCols(usedDims[1]);
 			federateFrame(output, feds);
@@ -170,23 +171,23 @@
 
 	public static String[] parseURL(String input) {
 		try {
-			// Artificially making it http protocol. 
+			// Artificially making it http protocol.
 			// This is to avoid malformed address error in the URL passing.
 			// TODO: Construct new protocol name for Federated communication
 			URL address = new URL("http://" + input);
 			String host = address.getHost();
-			if (host.length() == 0)
+			if(host.length() == 0)
 				throw new IllegalArgumentException("Missing Host name for federated address");
 			// The current system does not support ipv6, only ipv4.
 			// TODO: Support IPV6 address for Federated communication
 			String ipRegex = "^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$";
-			if (host.matches("^\\d+\\.\\d+\\.\\d+\\.\\d+$") && !host.matches(ipRegex))
+			if(host.matches("^\\d+\\.\\d+\\.\\d+\\.\\d+$") && !host.matches(ipRegex))
 				throw new IllegalArgumentException("Input Host address looks like an IP address but is outside range");
 			int port = address.getPort();
-			if (port == -1)
+			if(port == -1)
 				port = DMLConfig.DEFAULT_FEDERATED_PORT;
 			String filePath = address.getPath();
-			if (filePath.length() <= 1)
+			if(filePath.length() <= 1)
 				throw new IllegalArgumentException("Missing File path for federated address");
 			// Remove the first character making the path Dynamic from the location of the worker.
 			// This is in contrast to before where it was static paths
@@ -194,38 +195,38 @@
 			// To make static file paths use double "//" EG:
 			// example.dom//staticFile.txt
 			// example.dom/dynamicFile.txt
-			if (address.getQuery() != null)
+			if(address.getQuery() != null)
 				throw new IllegalArgumentException("Query is not supported");
 
-			if (address.getRef() != null)
+			if(address.getRef() != null)
 				throw new IllegalArgumentException("Reference is not supported");
-			
-			return new String[] { host, String.valueOf(port), filePath };
+
+			return new String[] {host, String.valueOf(port), filePath};
 		}
-		catch (MalformedURLException e) {
-			throw new IllegalArgumentException("federated address `" + input
-				+ "` does not fit required URL pattern of \"host:port/directory\"", e);
+		catch(MalformedURLException e) {
+			throw new IllegalArgumentException(
+				"federated address `" + input + "` does not fit required URL pattern of \"host:port/directory\"", e);
 		}
 	}
 
 	public static void federateMatrix(CacheableData<?> output, List<Pair<FederatedRange, FederatedData>> workers) {
 
 		Map<FederatedRange, FederatedData> fedMapping = new TreeMap<>();
-		for (Pair<FederatedRange, FederatedData> t : workers) {
+		for(Pair<FederatedRange, FederatedData> t : workers) {
 			fedMapping.put(t.getLeft(), t.getRight());
 		}
 		List<Pair<FederatedData, Future<FederatedResponse>>> idResponses = new ArrayList<>();
 		long id = FederationUtils.getNextFedDataID();
 		boolean rowPartitioned = true;
 		boolean colPartitioned = true;
-		for (Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
 			FederatedRange range = entry.getKey();
 			FederatedData value = entry.getValue();
-			if (!value.isInitialized()) {
+			if(!value.isInitialized()) {
 				long[] beginDims = range.getBeginDims();
 				long[] endDims = range.getEndDims();
 				long[] dims = output.getDataCharacteristics().getDims();
-				for (int i = 0; i < dims.length; i++)
+				for(int i = 0; i < dims.length; i++)
 					dims[i] = endDims[i] - beginDims[i];
 				idResponses.add(new ImmutablePair<>(value, value.initFederatedData(id)));
 			}
@@ -233,27 +234,32 @@
 			colPartitioned &= (range.getSize(0) == output.getNumRows());
 		}
 		try {
-			int timeout = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT);
+			int timeout = ConfigurationManager.getDMLConfig()
+				.getIntValue(DMLConfig.DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT);
 			LOG.debug("Federated Initialization with timeout: " + timeout);
-			for (Pair<FederatedData, Future<FederatedResponse>> idResponse : idResponses)
-				idResponse.getRight().get(timeout,TimeUnit.SECONDS); //wait for initialization
+			for(Pair<FederatedData, Future<FederatedResponse>> idResponse : idResponses)
+				idResponse.getRight().get(timeout, TimeUnit.SECONDS); // wait for initialization
 		}
-		catch (TimeoutException e){
+		catch(TimeoutException e) {
 			throw new DMLRuntimeException("Federated Initialization timeout exceeded", e);
 		}
-		catch (Exception e) {
+		catch(Exception e) {
 			throw new DMLRuntimeException("Federation initialization failed", e);
 		}
 		output.getDataCharacteristics().setNonZeros(-1);
 		output.getDataCharacteristics().setBlocksize(ConfigurationManager.getBlocksize());
 		output.setFedMapping(new FederationMap(id, fedMapping));
-		output.getFedMapping().setType(rowPartitioned && colPartitioned ? FType.FULL : 
-			rowPartitioned ? FType.ROW : colPartitioned ? FType.COL : FType.OTHER);
+
+		output.getFedMapping().setType(rowPartitioned &&
+			colPartitioned ? FType.FULL : rowPartitioned ? FType.ROW : colPartitioned ? FType.COL : FType.OTHER);
+
+		if(LOG.isDebugEnabled())
+			LOG.debug("Fed map Inited:" + output.getFedMapping());
 	}
-	
+
 	public static void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers) {
 		Map<FederatedRange, FederatedData> fedMapping = new TreeMap<>();
-		for (Pair<FederatedRange, FederatedData> t : workers) {
+		for(Pair<FederatedRange, FederatedData> t : workers) {
 			fedMapping.put(t.getLeft(), t.getRight());
 		}
 		// we want to wait for the futures with the response containing varIDs and the schemas of the frames
@@ -263,17 +269,18 @@
 		long id = FederationUtils.getNextFedDataID();
 		boolean rowPartitioned = true;
 		boolean colPartitioned = true;
-		for (Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
+		for(Map.Entry<FederatedRange, FederatedData> entry : fedMapping.entrySet()) {
 			FederatedRange range = entry.getKey();
 			FederatedData value = entry.getValue();
-			if (!value.isInitialized()) {
+			if(!value.isInitialized()) {
 				long[] beginDims = range.getBeginDims();
 				long[] endDims = range.getEndDims();
 				long[] dims = output.getDataCharacteristics().getDims();
-				for (int i = 0; i < dims.length; i++) {
+				for(int i = 0; i < dims.length; i++) {
 					dims[i] = endDims[i] - beginDims[i];
 				}
-				idResponses.add(new ImmutablePair<>(value, new ImmutablePair<>((int) beginDims[1], value.initFederatedData(id))));
+				idResponses.add(
+					new ImmutablePair<>(value, new ImmutablePair<>((int) beginDims[1], value.initFederatedData(id))));
 			}
 			rowPartitioned &= (range.getSize(1) == output.getNumColumns());
 			colPartitioned &= (range.getSize(0) == output.getNumRows());
@@ -282,23 +289,26 @@
 		Types.ValueType[] schema = new Types.ValueType[(int) output.getNumColumns()];
 		Arrays.fill(schema, Types.ValueType.UNKNOWN);
 		try {
-			for (Pair<FederatedData, Pair<Integer, Future<FederatedResponse>>> idResponse : idResponses) {
+			for(Pair<FederatedData, Pair<Integer, Future<FederatedResponse>>> idResponse : idResponses) {
 				FederatedData fedData = idResponse.getLeft();
 				FederatedResponse response = idResponse.getRight().getRight().get();
 				int startCol = idResponse.getRight().getLeft();
 				handleFedFrameResponse(schema, fedData, response, startCol);
 			}
 		}
-		catch (Exception e) {
+		catch(Exception e) {
 			throw new DMLRuntimeException("Federation initialization failed", e);
 		}
 		output.getDataCharacteristics().setNonZeros(output.getNumColumns() * output.getNumRows());
 		output.setSchema(schema);
 		output.setFedMapping(new FederationMap(id, fedMapping));
-		output.getFedMapping().setType(rowPartitioned && colPartitioned ? FType.FULL : 
-			rowPartitioned ? FType.ROW : colPartitioned ? FType.COL : FType.OTHER);
+		output.getFedMapping().setType(rowPartitioned &&
+			colPartitioned ? FType.FULL : rowPartitioned ? FType.ROW : colPartitioned ? FType.COL : FType.OTHER);
+
+		if(LOG.isDebugEnabled())
+			LOG.debug("Fed map Inited: " + output.getFedMapping());
 	}
-	
+
 	private static void handleFedFrameResponse(Types.ValueType[] schema, FederatedData federatedData,
 		FederatedResponse response, int startColumn) {
 		try {
@@ -315,7 +325,8 @@
 				else
 					schema[schema_index] = vType;
 			}
-		} catch (Exception e){
+		}
+		catch(Exception e) {
 			throw new DMLRuntimeException("Exception in frame response from federated worker.", e);
 		}
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
index b45654d..da25122 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/VariableFEDInstruction.java
@@ -61,18 +61,15 @@
 	public void processInstruction(ExecutionContext ec) {
 		VariableOperationCode opcode = _in.getVariableOpcode();
 		switch(opcode) {
-
 			case Write:
 				processWriteInstruction(ec);
 				break;
-
 			case CastAsMatrixVariable:
 				processCastAsMatrixVariableInstruction(ec);
 				break;
 			case CastAsFrameVariable:
 				processCastAsFrameVariableInstruction(ec);
 				break;
-
 			default:
 				throw new DMLRuntimeException("Unsupported Opcode for federated Variable Instruction : " + opcode);
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java b/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
index 1361637..0527d23 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderWriterFederated.java
@@ -18,8 +18,6 @@
  */
 package org.apache.sysds.runtime.io;
 
-import static org.junit.Assert.fail;
-
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -117,7 +115,7 @@
 			IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
 		}
 		catch(IOException e) {
-			fail("Unable to write test federated matrix to (" + file + "): " + e.getMessage());
+			throw new DMLRuntimeException("Unable to write test federated matrix to (" + file + "): " + e.getMessage());
 		}
 	}