[SYSTEMDS-2548,2796] Federated frame and matrix right indexing

Closes #1142.
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 89bf691..fa4ea24 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -25,7 +25,9 @@
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.AggOp;
 import org.apache.sysds.common.Types.CorrectionLocationType;
+import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.Direction;
+import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.lops.WeightedCrossEntropy;
 import org.apache.sysds.lops.WeightedCrossEntropyR;
@@ -976,6 +978,10 @@
 		}
 	}
 	
+	public static String createLiteralOperand(String val, ValueType vt) {
+		return InstructionUtils.concatOperandParts(val, DataType.SCALAR.name(), vt.name(), "true");
+	}
+	
 	public static String replaceOperand(String instStr, int operand, String newValue) {
 		//split instruction and check for correctness
 		String[] parts = instStr.split(Lop.OPERAND_DELIMITOR);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 34f40bb..fbdb3a2 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -33,7 +33,6 @@
 import org.apache.sysds.runtime.instructions.cp.IndexingCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.MMChainCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
-import org.apache.sysds.runtime.instructions.cp.MatrixIndexingCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.MultiReturnParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.QuaternaryCPInstruction;
@@ -152,12 +151,13 @@
 				}
 			}
 		}
-		else if(inst instanceof MatrixIndexingCPInstruction) {
-			// matrix indexing
-			MatrixIndexingCPInstruction minst = (MatrixIndexingCPInstruction) inst;
+		else if(inst instanceof IndexingCPInstruction) {
+			// matrix and frame indexing
+			IndexingCPInstruction minst = (IndexingCPInstruction) inst;
 			if(inst.getOpcode().equalsIgnoreCase("rightIndex")
-				&& minst.input1.isMatrix() && ec.getCacheableData(minst.input1).isFederated()) {
-				fedinst = MatrixIndexingFEDInstruction.parseInstruction(minst.getInstructionString());
+				&& (minst.input1.isMatrix() || minst.input1.isFrame())
+				&& ec.getCacheableData(minst.input1).isFederated()) {
+				fedinst = IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
 			}
 		}
 		else if(inst instanceof VariableCPInstruction ){
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
index a4aadbc..f8ab210 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
@@ -19,16 +19,30 @@
 
 package org.apache.sysds.runtime.instructions.fed;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.lops.LeftIndex;
+import org.apache.sysds.lops.Lop;
 import org.apache.sysds.lops.RightIndex;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.util.IndexRange;
 
-public abstract class IndexingFEDInstruction extends UnaryFEDInstruction {
+public final class IndexingFEDInstruction extends UnaryFEDInstruction {
 	protected final CPOperand rowLower, rowUpper, colLower, colUpper;
 
 	protected IndexingFEDInstruction(CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu,
@@ -70,10 +84,11 @@
 				cl = new CPOperand(parts[4]);
 				cu = new CPOperand(parts[5]);
 				out = new CPOperand(parts[6]);
-				if(in.getDataType() == Types.DataType.MATRIX)
-					return new MatrixIndexingFEDInstruction(in, rl, ru, cl, cu, out, opcode, str);
-				else
-					throw new DMLRuntimeException("Can index only on matrices, frames, and lists in federated.");
+
+				if(in.getDataType() != Types.DataType.MATRIX && in.getDataType() != Types.DataType.FRAME)
+					throw new DMLRuntimeException("Can index only on matrices, frames in federated.");
+
+				return new IndexingFEDInstruction(in, rl, ru, cl, cu, out, opcode, str);
 			}
 			else {
 				throw new DMLRuntimeException("Invalid number of operands in instruction: " + str);
@@ -86,4 +101,74 @@
 			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixIndexingFEDInstruction: " + str);
 		}
 	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		rightIndexing(ec);
+	}
+
+	private void rightIndexing(ExecutionContext ec)
+	{
+		//get input and requested index range
+		CacheableData<?> in = ec.getCacheableData(input1);
+		IndexRange ixrange = getIndexRange(ec);
+
+		//prepare output federation map (copy-on-write)
+		FederationMap fedMap = in.getFedMapping().filter(ixrange);
+
+		//modify federated ranges in place
+		String[] instStrings = new String[fedMap.getSize()];
+
+		//create new frame schema
+		List<Types.ValueType> schema = new ArrayList<>();
+
+		// replace old reshape values for each worker
+		int i = 0;
+		for(FederatedRange range : fedMap.getMap().keySet()) {
+			long rs = range.getBeginDims()[0], re = range.getEndDims()[0],
+				cs = range.getBeginDims()[1], ce = range.getEndDims()[1];
+			long rsn = (ixrange.rowStart >= rs) ? (ixrange.rowStart - rs) : 0;
+			long ren = (ixrange.rowEnd >= rs && ixrange.rowEnd < re) ? (ixrange.rowEnd - rs) : (re - rs - 1);
+			long csn = (ixrange.colStart >= cs) ? (ixrange.colStart - cs) : 0;
+			long cen = (ixrange.colEnd >= cs && ixrange.colEnd < ce) ? (ixrange.colEnd - cs) : (ce - cs - 1);
+
+			range.setBeginDim(0, Math.max(rs - ixrange.rowStart, 0));
+			range.setBeginDim(1, Math.max(cs - ixrange.colStart, 0));
+			range.setEndDim(0, (ixrange.rowEnd >= re ? re-ixrange.rowStart : ixrange.rowEnd-ixrange.rowStart + 1));
+			range.setEndDim(1, (ixrange.colEnd >= ce ? ce-ixrange.colStart : ixrange.colEnd-ixrange.colStart + 1));
+
+			long[] newIx = new long[]{rsn, ren, csn, cen};
+
+			// change 4 indices in instString
+			instStrings[i] = instString;
+			String[] instParts = instString.split(Lop.OPERAND_DELIMITOR);
+			for(int j = 3; j < 7; j++)
+				instParts[j] = InstructionUtils.createLiteralOperand(String.valueOf(newIx[j-3]+1), ValueType.INT64);
+			instStrings[i] = String.join(Lop.OPERAND_DELIMITOR, instParts);
+			
+			if(input1.isFrame()) {
+				//modify frame schema
+				if(in.isFederated(FederationMap.FType.ROW))
+					schema = Arrays.asList(((FrameObject) in).getSchema((int) csn, (int) cen));
+				else
+					Collections.addAll(schema, ((FrameObject) in).getSchema((int) csn, (int) cen));
+			}
+			i++;
+		}
+		FederatedRequest[] fr1 = FederationUtils.callInstruction(instStrings,
+			output, new CPOperand[] {input1}, new long[] {fedMap.getID()});
+		fedMap.execute(getTID(), true, fr1, new FederatedRequest[0]);
+
+		if(input1.isFrame()) {
+			FrameObject out = ec.getFrameObject(output);
+			out.setSchema(schema.toArray(new Types.ValueType[0]));
+			out.getDataCharacteristics().setDimension(fedMap.getMaxIndexInRange(0), fedMap.getMaxIndexInRange(1));
+			out.setFedMapping(fedMap.copyWithNewID(fr1[0].getID()));
+		} else {
+			MatrixObject out = ec.getMatrixObject(output);
+			out.getDataCharacteristics().set(fedMap.getMaxIndexInRange(0), fedMap.getMaxIndexInRange(1),
+				(int) ((MatrixObject)in).getBlocksize());
+			out.setFedMapping(fedMap.copyWithNewID(fr1[0].getID()));
+		}
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
deleted file mode 100644
index 4de60b4..0000000
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/MatrixIndexingFEDInstruction.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sysds.runtime.instructions.fed;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
-import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
-import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
-import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
-import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.instructions.cp.Data;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.util.IndexRange;
-
-public final class MatrixIndexingFEDInstruction extends IndexingFEDInstruction {
-	private static final Log LOG = LogFactory.getLog(MatrixIndexingFEDInstruction.class.getName());
-
-	public MatrixIndexingFEDInstruction(CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu,
-		CPOperand out, String opcode, String istr) {
-		super(in, rl, ru, cl, cu, out, opcode, istr);
-	}
-
-	@Override
-	public void processInstruction(ExecutionContext ec) {
-		rightIndexing(ec);
-	}
-
-	private void rightIndexing(ExecutionContext ec)
-	{
-		//get input and requested index range
-		MatrixObject in = ec.getMatrixObject(input1);
-		IndexRange ixrange = getIndexRange(ec);
-		
-		//prepare output federation map (copy-on-write)
-		FederationMap fedMap = in.getFedMapping().filter(ixrange);
-		
-		//modify federated ranges in place
-		Map<FederatedRange, IndexRange> ixs = new HashMap<>();
-		for(FederatedRange range : fedMap.getMap().keySet()) {
-			long rs = range.getBeginDims()[0], re = range.getEndDims()[0],
-				cs = range.getBeginDims()[1], ce = range.getEndDims()[1];
-			long rsn = (ixrange.rowStart >= rs) ? (ixrange.rowStart - rs) : 0;
-			long ren = (ixrange.rowEnd >= rs && ixrange.rowEnd < re) ? (ixrange.rowEnd - rs) : (re - rs - 1);
-			long csn = (ixrange.colStart >= cs) ? (ixrange.colStart - cs) : 0;
-			long cen = (ixrange.colEnd >= cs && ixrange.colEnd < ce) ? (ixrange.colEnd - cs) : (ce - cs - 1);
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("Ranges for fed location: " + rsn + " " + ren + " " + csn + " " + cen);
-				LOG.debug("ixRange                : " + ixrange);
-				LOG.debug("Fed Mapping            : " + range);
-			}
-			range.setBeginDim(0, Math.max(rs - ixrange.rowStart, 0));
-			range.setBeginDim(1, Math.max(cs - ixrange.colStart, 0));
-			range.setEndDim(0, (ixrange.rowEnd >= re ? re-ixrange.rowStart : ixrange.rowEnd-ixrange.rowStart + 1));
-			range.setEndDim(1, (ixrange.colEnd >= ce ? ce-ixrange.colStart : ixrange.colEnd-ixrange.colStart + 1));
-			if(LOG.isDebugEnabled())
-				LOG.debug("Fed Mapping After      : " + range);
-			ixs.put(range, new IndexRange(rsn, ren, csn, cen));
-		}
-
-		// execute slicing of valid range 
-		long varID = FederationUtils.getNextFedDataID();
-		FederationMap slicedFedMap = fedMap.mapParallel(varID, (range, data) -> {
-			try {
-				FederatedResponse response = data.executeFederatedOperation(new FederatedRequest(
-					FederatedRequest.RequestType.EXEC_UDF, -1,
-					new SliceMatrix(data.getVarID(), varID, ixs.get(range)))).get();
-				if(!response.isSuccessful())
-					response.throwExceptionFromResponse();
-				return null;
-			}
-			catch(Exception e) {
-				throw new DMLRuntimeException(e);
-			}
-		});
-
-		//update output mapping and data characteristics
-		MatrixObject sliced = ec.getMatrixObject(output);
-		sliced.getDataCharacteristics()
-			.set(slicedFedMap.getMaxIndexInRange(0), slicedFedMap.getMaxIndexInRange(1), (int) in.getBlocksize());
-		sliced.setFedMapping(slicedFedMap);
-		
-		//TODO is this really necessary
-		if(ixrange.rowEnd - ixrange.rowStart == 0)
-			slicedFedMap.setType(FederationMap.FType.COL);
-		else if(ixrange.colEnd - ixrange.colStart == 0)
-			slicedFedMap.setType(FederationMap.FType.ROW);
-	}
-
-	private static class SliceMatrix extends FederatedUDF {
-
-		private static final long serialVersionUID = 5956832933333848772L;
-		private final long _outputID;
-		private final IndexRange _ixrange;
-
-		private SliceMatrix(long input, long outputID, IndexRange ixrange) {
-			super(new long[] {input});
-			_outputID = outputID;
-			_ixrange = ixrange;
-		}
-
-		@Override
-		public FederatedResponse execute(ExecutionContext ec, Data... data) {
-			MatrixBlock mb = ((MatrixObject) data[0]).acquireReadAndRelease();
-			MatrixBlock res = mb.slice(_ixrange, new MatrixBlock());
-			MatrixObject mout = ExecutionContext.createMatrixObject(res);
-			ec.setVariable(String.valueOf(_outputID), mout);
-
-			return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS_EMPTY);
-		}
-	}
-}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
index 1401792..e950c20 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRightIndexTest.java
@@ -44,6 +44,7 @@
 	private final static String TEST_NAME1 = "FederatedRightIndexRightTest";
 	private final static String TEST_NAME2 = "FederatedRightIndexLeftTest";
 	private final static String TEST_NAME3 = "FederatedRightIndexFullTest";
+	private final static String TEST_NAME4 = "FederatedRightIndexFrameFullTest";
 
 	private final static String TEST_DIR = "functions/federated/";
 	private static final String TEST_CLASS_DIR = TEST_DIR + FederatedRightIndexTest.class.getSimpleName() + "/";
@@ -65,37 +66,49 @@
 
 	@Parameterized.Parameters
 	public static Collection<Object[]> data() {
-		return Arrays.asList(new Object[][] {{20, 10, 1, 1, true}, {20, 10, 3, 5, true}, {10, 12, 1, 10, false}});
+		return Arrays.asList(new Object[][] {
+			{20, 10, 1, 1, true}, {20, 10, 3, 5, true},
+			{10, 12, 1, 10, false}});
 	}
 
 	private enum IndexType {
 		RIGHT, LEFT, FULL
 	}
 
+	private enum DataType {
+		MATRIX, FRAME
+	}
+
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
 		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
 		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
+		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
 	}
 
 	// @Test
 	// public void testRightIndexRightDenseMatrixCP() {
-	// runAggregateOperationTest(IndexType.RIGHT, ExecMode.SINGLE_NODE);
+	// runAggregateOperationTest(IndexType.RIGHT, DataType.MATRIX, ExecMode.SINGLE_NODE);
 	// }
 
 	// @Test
 	// public void testRightIndexLeftDenseMatrixCP() {
-	// runAggregateOperationTest(IndexType.LEFT, ExecMode.SINGLE_NODE);
+	// runAggregateOperationTest(IndexType.LEFT, DataType.MATRIX, ExecMode.SINGLE_NODE);
 	// }
 
 	@Test
 	public void testRightIndexFullDenseMatrixCP() {
-		runAggregateOperationTest(IndexType.FULL, ExecMode.SINGLE_NODE);
+		runAggregateOperationTest(IndexType.FULL, DataType.MATRIX, ExecMode.SINGLE_NODE);
 	}
 
-	private void runAggregateOperationTest(IndexType type, ExecMode execMode) {
+	@Test
+	public void testRightIndexFullDenseFrameCP() {
+		runAggregateOperationTest(IndexType.FULL, DataType.FRAME, ExecMode.SINGLE_NODE);
+	}
+
+	private void runAggregateOperationTest(IndexType indexType, DataType dataType, ExecMode execMode) {
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
 		ExecMode platformOld = rtplatform;
 
@@ -103,7 +116,7 @@
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 
 		String TEST_NAME = null;
-		switch(type) {
+		switch(indexType) {
 			case RIGHT:
 				from = from <= cols ? from : cols;
 				to = to <= cols ? to : cols;
@@ -115,7 +128,10 @@
 				TEST_NAME = TEST_NAME2;
 				break;
 			case FULL:
-				TEST_NAME = TEST_NAME3;
+				if(dataType == DataType.MATRIX)
+					TEST_NAME = TEST_NAME3;
+				else
+					TEST_NAME = TEST_NAME4;
 				from = from <= rows && from <= cols ? from : Math.min(rows, cols);
 				to = to <= rows && to <= cols ? to : Math.min(rows, cols);
 				break;
diff --git a/src/test/scripts/functions/federated/FederatedRightIndexFrameFullTest.dml b/src/test/scripts/functions/federated/FederatedRightIndexFrameFullTest.dml
new file mode 100644
index 0000000..1470633
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedRightIndexFrameFullTest.dml
@@ -0,0 +1,40 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $from;
+to = $to;
+
+if ($rP) {
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+} else {
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+    list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+}
+
+A = as.frame(A)
+
+s = A[from:to, from:to];
+write(s, $out_S);
+
+print(toString(s))
diff --git a/src/test/scripts/functions/federated/FederatedRightIndexFrameFullTestReference.dml b/src/test/scripts/functions/federated/FederatedRightIndexFrameFullTestReference.dml
new file mode 100644
index 0000000..065f213
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedRightIndexFrameFullTestReference.dml
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $5;
+to = $6;
+
+if($7)
+  A = rbind(read($1), read($2), read($3), read($4));
+else
+  A = cbind(read($1), read($2), read($3), read($4));
+
+A = as.frame(A)
+
+s = A[from:to, from:to];
+write(s, $8);
+
+print(toString(s))
diff --git a/src/test/scripts/functions/federated/FederatedRightIndexFullTest.dml b/src/test/scripts/functions/federated/FederatedRightIndexFullTest.dml
index a3af7bc..a0f42bd 100644
--- a/src/test/scripts/functions/federated/FederatedRightIndexFullTest.dml
+++ b/src/test/scripts/functions/federated/FederatedRightIndexFullTest.dml
@@ -22,14 +22,15 @@
 from = $from;
 to = $to;
 
+
 if ($rP) {
-    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
-        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
-    		list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), list(2*$rows/4, $cols),
+    list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), list($rows, $cols)));
 } else {
-    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
-            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
-            	list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, $cols/2),
+    list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), list($rows, $cols)));
 }
 
 s = A[from:to, from:to];