[SYSTEMDS-2696] Overlapping relational operations

This commit adds relational support for relational operations,
(< > <= etc) in the compressed space, for overlapping matrices.

If the relational expression returns a constant matrix, the operations
are super fast, while if the output is mixed it performs okay compared
to our uncompressed. Further optimizations are available, but since this
operation is not critical it is not explored yet.
diff --git a/pom.xml b/pom.xml
index 4027916..5433f3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -280,7 +280,7 @@
 					<reuseForks>false</reuseForks>
 					<reportFormat>brief</reportFormat>
 					<trimStackTrace>true</trimStackTrace>
-					<rerunFailingTestsCount>2</rerunFailingTestsCount>
+					<!-- <rerunFailingTestsCount>2</rerunFailingTestsCount> -->
 				</configuration>
 			</plugin>
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java b/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
index d3f15fd..ba2f82d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
@@ -21,6 +21,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -74,7 +75,7 @@
 			else
 				reader = new ReaderColumnSelectionDense(rawBlock, colIndices, compSettings);
 
-			res = extractBitmap(colIndices, rawBlock, reader);
+			res = extractBitmap(colIndices, reader);
 		}
 		if(compSettings.lossy) {
 			return makeBitmapLossy(res);
@@ -84,6 +85,12 @@
 		}
 	}
 
+	public static ABitmap extractBitmap(int[] colIndices, int rows, BitSet rawBlock, CompressionSettings compSettings) {
+		ReaderColumnSelection reader = new ReaderColumnSelectionBitSet(rawBlock, rows, colIndices, compSettings);
+		Bitmap res = extractBitmap(colIndices, reader);
+		return res;
+	}
+
 	/**
 	 * Extract Bitmap from a single column.
 	 * 
@@ -149,11 +156,10 @@
 	 * It counts the instances of rows containing only zero values, but other groups can contain a zero value.
 	 * 
 	 * @param colIndices The Column indexes to extract the multi-column bit map from.
-	 * @param rawBlock   The raw block to extract from
 	 * @param rowReader  A Reader for the columns selected.
 	 * @return The Bitmap
 	 */
-	private static Bitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, ReaderColumnSelection rowReader) {
+	private static Bitmap extractBitmap(int[] colIndices, ReaderColumnSelection rowReader) {
 		// probe map for distinct items (for value or value groups)
 		DblArrayIntListHashMap distinctVals;
 		if(colIndices.length > 10) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index d69ebad..ebe1e0d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -52,6 +52,7 @@
 import org.apache.sysds.runtime.compress.colgroup.SparseRowIterator;
 import org.apache.sysds.runtime.compress.lib.LibBinaryCellOp;
 import org.apache.sysds.runtime.compress.lib.LibLeftMultBy;
+import org.apache.sysds.runtime.compress.lib.LibRelationalOp;
 import org.apache.sysds.runtime.compress.lib.LibRightMultBy;
 import org.apache.sysds.runtime.compress.lib.LibScalar;
 import org.apache.sysds.runtime.compress.utils.ColumnGroupIterator;
@@ -61,16 +62,21 @@
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysds.runtime.functionobjects.Equals;
+import org.apache.sysds.runtime.functionobjects.GreaterThan;
+import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysds.runtime.functionobjects.LessThan;
+import org.apache.sysds.runtime.functionobjects.LessThanEquals;
 import org.apache.sysds.runtime.functionobjects.Mean;
 import org.apache.sysds.runtime.functionobjects.Minus;
 import org.apache.sysds.runtime.functionobjects.MinusMultiply;
 import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.NotEquals;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.functionobjects.PlusMultiply;
-import org.apache.sysds.runtime.functionobjects.Power2;
 import org.apache.sysds.runtime.functionobjects.ReduceAll;
 import org.apache.sysds.runtime.functionobjects.ReduceCol;
 import org.apache.sysds.runtime.functionobjects.ReduceRow;
@@ -84,7 +90,6 @@
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.utils.DMLCompressionStatistics;
@@ -194,7 +199,7 @@
 		if(DMLScript.STATISTICS || LOG.isDebugEnabled()) {
 			double t = time.stop();
 			LOG.debug("decompressed block w/ k=" + 1 + " in " + t + "ms.");
-			DMLCompressionStatistics.addDecompressTime(t,1);
+			DMLCompressionStatistics.addDecompressTime(t, 1);
 		}
 		return ret;
 	}
@@ -372,18 +377,27 @@
 
 	@Override
 	public MatrixBlock scalarOperations(ScalarOperator sop, MatrixValue result) {
-		if(overlappingColGroups && !(sop.fn instanceof Multiply || sop.fn instanceof Plus || sop.fn instanceof Minus ||
-			(sop instanceof LeftScalarOperator && sop.fn instanceof Power2))) {
+		// Special case handling of overlapping relational operations
+		if(sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan ||
+			sop.fn instanceof GreaterThanEquals || sop.fn instanceof Equals || sop.fn instanceof NotEquals) {
+			CompressedMatrixBlock ret = null;
+			if(result == null || !(result instanceof CompressedMatrixBlock))
+				ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse);
+			return LibRelationalOp.relationalOperation(sop, this, ret, overlappingColGroups);
+		}
+
+		if(overlappingColGroups &&
+			(!(sop.fn instanceof Multiply || sop.fn instanceof Plus || sop.fn instanceof Minus))) {
+			LOG.warn("scalar overlapping not supported for op: " + sop.fn);
 			MatrixBlock m1d = decompress(sop.getNumThreads());
-			result = m1d.scalarOperations(sop, result);
-			return (MatrixBlock) result;
+			return m1d.scalarOperations(sop, result);
+
 		}
 
 		CompressedMatrixBlock ret = null;
 		if(result == null || !(result instanceof CompressedMatrixBlock))
 			ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse);
-		result = LibScalar.scalarOperations(sop, this, ret, overlappingColGroups);
-		return (MatrixBlock) result;
+		return LibScalar.scalarOperations(sop, this, ret, overlappingColGroups);
 	}
 
 	@Override
@@ -499,7 +513,7 @@
 
 		// compute matrix mult
 		MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
-		tmp  = LibRightMultBy.rightMultByMatrix(_colGroups, v, tmp, k, getMaxNumValues(), false);
+		tmp = LibRightMultBy.rightMultByMatrix(_colGroups, v, tmp, k, getMaxNumValues(), false);
 		if(ctype == ChainType.XtwXv) {
 			BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
 			LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
@@ -563,6 +577,7 @@
 			(op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin &&
 				(((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
 					((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+			LOG.info("Unsupported overlapping aggregate: " + op.aggOp.increOp.fn);
 			MatrixBlock m1d = decompress(op.getNumThreads());
 			return m1d.aggregateUnaryOperations(op, result, blen, indexesIn, inCP);
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionBitSet.java b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionBitSet.java
new file mode 100644
index 0000000..8f4aeff
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionBitSet.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress;
+
+import java.util.BitSet;
+
+import org.apache.sysds.runtime.compress.utils.DblArray;
+
+public class ReaderColumnSelectionBitSet extends ReaderColumnSelection {
+	protected BitSet _data;
+
+	private DblArray reusableReturn;
+	private double[] reusableArr;
+
+	public ReaderColumnSelectionBitSet(BitSet data, int rows, int[] colIndices, CompressionSettings compSettings) {
+		super(colIndices, rows, compSettings);
+		_data = data;
+		reusableArr = new double[colIndices.length];
+		reusableReturn = new DblArray(reusableArr);
+	}
+
+	protected DblArray getNextRow() {
+		if(_lastRow == _numRows - 1)
+			return null;
+		_lastRow++;
+		for(int i = 0; i < _colIndexes.length; i++) {
+			reusableArr[i] = _data.get(_lastRow * _colIndexes.length + i) ? 1 : 0;
+		}
+		return reusableReturn;
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
index 6e833b4..7929526 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
@@ -30,6 +30,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.data.SparseRow;
+import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
@@ -192,6 +193,27 @@
 	public abstract void decompressToBlock(MatrixBlock target, int rl, int ru);
 
 	/**
+	 * Decompress the contents of this column group into the specified full matrix block.
+	 * 
+	 * @param target a matrix block where the columns covered by this column group have not yet been filled in.
+	 * @param rl     row lower
+	 * @param ru     row upper
+	 * @param offT   The offset into the target matrix block to decompress to.
+	 */
+	public abstract void decompressToBlock(MatrixBlock target, int rl, int ru, int offT);
+
+	/**
+	 * Decompress the contents of this column group into the specified full matrix block.
+	 * 
+	 * @param target a matrix block where the columns covered by this column group have not yet been filled in.
+	 * @param rl     row lower
+	 * @param ru     row upper
+	 * @param offT   The offset into the target matrix block to decompress to.
+	 * @param values The Values materialized in the dictionary
+	 */
+	public abstract void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values);
+
+	/**
 	 * Decompress the contents of this column group into uncompressed packed columns
 	 * 
 	 * @param target          a dense matrix block. The block must have enough space to hold the contents of this column
@@ -200,10 +222,10 @@
 	 */
 	public abstract void decompressToBlock(MatrixBlock target, int[] colIndexTargets);
 
-	public static void decompressToBlock(MatrixBlock target, int colIndex, List<ColGroup> colGroups){
-		for(ColGroup g: colGroups){
+	public static void decompressToBlock(MatrixBlock target, int colIndex, List<ColGroup> colGroups) {
+		for(ColGroup g : colGroups) {
 			int groupColIndex = Arrays.binarySearch(g._colIndexes, colIndex);
-			if( groupColIndex >= 0){
+			if(groupColIndex >= 0) {
 				g.decompressToBlock(target, groupColIndex);
 			}
 		}
@@ -350,8 +372,8 @@
 	 * @param ru      The row to stop the matrix multiplication at.
 	 * @param vOff    The offset into the first argument matrix to start at.
 	 */
-	public abstract void leftMultByMatrix(double[] matrix, double[] result, double[] values, int numRows,
-		int numCols, int rl, int ru, int vOff);
+	public abstract void leftMultByMatrix(double[] matrix, double[] result, double[] values, int numRows, int numCols,
+		int rl, int ru, int vOff);
 
 	/**
 	 * Multiply with a sparse matrix on the left hand side, and add the values to the output result
@@ -401,6 +423,15 @@
 	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, double[] c);
 
 	/**
+	 * Compute the max / min value contained in the dictionary.
+	 * 
+	 * @param c       Initial value
+	 * @param builtin The build in to use
+	 * @return The result value
+	 */
+	public abstract double computeMxx(double c, Builtin builtin);
+
+	/**
 	 * Unary Aggregate operator, since aggregate operators require new object output, the output becomes an uncompressed
 	 * matrix.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index c4f148a..d0629ff 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -110,11 +110,10 @@
 	}
 
 	@Override
-	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
 		final int ncol = getNumCols();
-		final double[] values = getValues();
 
-		for(int i = rl; i < ru; i++)
+		for(int i = rl; i < ru; i++, offT++)
 			for(int j = 0; j < ncol; j++) {
 				double v = target.quickGetValue(i, _colIndexes[j]);
 				target.setValue(i, _colIndexes[j], values[j] + v);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index a3159ae..0263307 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -55,14 +55,14 @@
 	}
 
 	@Override
-	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int off, double[] values) {
 		final int nCol = getNumCols();
-		final double[] values = getValues();
-		for(int i = rl; i < ru; i++)
+		for(int i = rl; i < ru; i++, off++) {
 			for(int j = 0; j < nCol; j++) {
-				double v = target.quickGetValue(i, _colIndexes[j]);
-				target.quickSetValue(i, _colIndexes[j], getData(i, j, values) + v);
+				double v = target.quickGetValue(off, _colIndexes[j]);
+				target.quickSetValue(off, _colIndexes[j], getData(i, j, values) + v);
 			}
+		}
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
index 466c942..5e0f712 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
@@ -88,39 +88,43 @@
 	}
 
 	@Override
-	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
 		if(getNumValues() > 1) {
 			final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
 			final int numCols = getNumCols();
 			final int numVals = getNumValues();
-			final double[] values = getValues();
 
 			// cache blocking config and position array
 			int[] apos = skipScan(numVals, rl);
 
 			// cache conscious append via horizontal scans
-			for(int bi = rl; bi < ru; bi += blksz) {
+			for(int bi = (rl / blksz) * blksz; bi < ru; bi += blksz) {
 				for(int k = 0, off = 0; k < numVals; k++, off += numCols) {
 					int boff = _ptr[k];
 					int blen = len(k);
 					int bix = apos[k];
+					
 					if(bix >= blen)
 						continue;
 					int len = _data[boff + bix];
 					int pos = boff + bix + 1;
-					for(int i = pos; i < pos + len; i++)
-						for(int j = 0, rix = bi + _data[i]; j < numCols; j++)
-							if(values[off + j] != 0) {
-								double v = target.quickGetValue(rix, _colIndexes[j]);
-								target.setValue(rix, _colIndexes[j], values[off + j] + v);
-							}
+					for(int i = pos; i < pos + len; i++) {
+						int row = bi + _data[i];
+						if(row >= rl && row < ru){
+							int rix = row - (rl - offT);
+								for(int j = 0; j < numCols; j++) {
+									double v = target.quickGetValue(rix, _colIndexes[j]);
+									target.setValue(rix, _colIndexes[j], values[off + j] + v);
+								}
+						}
+					}
 					apos[k] += len + 1;
 				}
 			}
 		}
 		else {
 			// call generic decompression with decoder
-			super.decompressToBlock(target, rl, ru);
+			super.decompressToBlock(target, rl, ru, offT, values);
 		}
 	}
 
@@ -881,8 +885,9 @@
 	 * @return array of positions for all values
 	 */
 	private int[] skipScan(int numVals, int rl) {
-		int[] ret = allocIVector(numVals, rl == 0);
 		final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
+		rl = (rl / blksz) * blksz;
+		int[] ret = allocIVector(numVals, rl == 0);
 
 		if(rl > 0) { // rl aligned with blksz
 			for(int k = 0; k < numVals; k++) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
index 1d6f1d1..d518434 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
@@ -98,13 +98,11 @@
 		}
 	}
 
-	// generic decompression for OLE/RLE, to be overwritten for performance
 	@Override
-	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
 		final int numCols = getNumCols();
 		final int numVals = getNumValues();
 		int[] colIndices = getColIndices();
-		final double[] values = getValues();
 
 		// Run through the bitmaps for this column group
 		for(int i = 0; i < numVals; i++) {
@@ -117,9 +115,11 @@
 					continue;
 				if(row > ru)
 					break;
-
-				for(int colIx = 0; colIx < numCols; colIx++)
-					target.appendValue(row, colIndices[colIx], values[valOff + colIx]);
+				row = row - (rl - offT);
+				for(int colIx = 0; colIx < numCols; colIx++){
+					double v = target.quickGetValue(row , colIndices[colIx]);
+					target.setValue(row, colIndices[colIx], v + values[valOff + colIx]);
+				}
 			}
 		}
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
index 1c82862..6bbcd5d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
@@ -90,12 +90,11 @@
 	}
 
 	@Override
-	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
 		if(getNumValues() > 1) {
 			final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
 			final int numCols = getNumCols();
 			final int numVals = getNumValues();
-			final double[] values = getValues();
 
 			// position and start offset arrays
 			int[] astart = new int[numVals];
@@ -112,11 +111,11 @@
 					for(; bix < blen & start < bimax; bix += 2) {
 						start += _data[boff + bix];
 						int len = _data[boff + bix + 1];
-						for(int i = Math.max(rl, start); i < Math.min(start + len, ru); i++)
+						for(int i = Math.max(rl, start) - (rl - offT); i < Math.min(start + len, ru) - (rl - offT); i++)
 							for(int j = 0; j < numCols; j++) {
 								if(values[off + j] != 0) {
 									double v = target.quickGetValue(i, _colIndexes[j]);
-									target.setValue(i, _colIndexes[j], values[off + j] + v);
+									target.quickSetValue(i, _colIndexes[j], values[off + j] + v);
 								}
 							}
 						start += len;
@@ -128,7 +127,7 @@
 		}
 		else {
 			// call generic decompression with decoder
-			super.decompressToBlock(target, rl, ru);
+			super.decompressToBlock(target, rl, ru, offT, values);
 		}
 	}
 
@@ -210,7 +209,7 @@
 				for(; bix < blen & start < bimax; bix += 2) {
 					start += _data[boff + bix];
 					int len = _data[boff + bix + 1];
-					for(int i = start; i< start + len; i++)
+					for(int i = start; i < start + len; i++)
 						c[i] += values[off + colpos];
 					nnz += len;
 					start += len;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index 9898bb2..bc50ca5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -32,6 +32,7 @@
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseBlock.Type;
 import org.apache.sysds.runtime.data.SparseRow;
+import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.ReduceRow;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
@@ -201,14 +202,33 @@
 
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+		decompressToBlock(target, rl, ru, rl);
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT) {
 		// empty block, nothing to add to output
 		if(_data.isEmptyBlock(false))
 			return;
-		for(int row = rl; row < ru; row++) {
+		for(int row = rl; row < ru; row++, offT++) {
 			for(int colIx = 0; colIx < _colIndexes.length; colIx++) {
 				int col = _colIndexes[colIx];
 				double cellVal = _data.quickGetValue(row, colIx);
-				target.quickSetValue(row, col, cellVal);
+				target.quickSetValue(offT, col, target.quickGetValue(offT, col) + cellVal);
+			}
+		}
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
+		// empty block, nothing to add to output
+		if(_data.isEmptyBlock(false))
+			return;
+		for(int row = rl; row < ru; row++, offT++) {
+			for(int colIx = 0; colIx < _colIndexes.length; colIx++) {
+				int col = _colIndexes[colIx];
+				double cellVal = _data.quickGetValue(row, colIx);
+				target.quickSetValue(offT, col, target.quickGetValue(offT, col) + cellVal);
 			}
 		}
 	}
@@ -317,8 +337,8 @@
 	}
 
 	@Override
-	public void leftMultByMatrix(double[] vector, double[] c, double[] values, int numRows, int numCols,
-		int rl, int ru, int vOff) {
+	public void leftMultByMatrix(double[] vector, double[] c, double[] values, int numRows, int numCols, int rl, int ru,
+		int vOff) {
 		throw new NotImplementedException("Should not be called use other matrix function for uncompressed columns");
 	}
 
@@ -328,6 +348,10 @@
 		throw new NotImplementedException("Should not be called use other matrix function for uncompressed columns");
 	}
 
+	public double computeMxx(double c, Builtin builtin) {
+		throw new NotImplementedException("Not implemented max min on uncompressed");
+	}
+
 	public void leftMultByMatrix(MatrixBlock matrix, MatrixBlock result) {
 		MatrixBlock pret = new MatrixBlock(matrix.getNumRows(), _colIndexes.length, false);
 		LibMatrixMult.matrixMult(matrix, _data, pret);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
index ab06509..442c2ad 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
@@ -103,6 +103,16 @@
 		super(colIndices, numRows);
 		_dict = dict;
 	}
+	
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+		decompressToBlock(target,rl,ru,rl);
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT) {
+		decompressToBlock(target,rl,ru,offT, getValues());
+	}
 
 	/**
 	 * Obtain number of distinct sets of values associated with the bitmaps in this column group.
@@ -331,19 +341,11 @@
 		return ret;
 	}
 
-	/**
-	 * Compute the Max or other equivalent operations.
-	 * 
-	 * NOTE: Shared across OLE/RLE/DDC because value-only computation.
-	 * 
-	 * @param c       output matrix block
-	 * @param builtin function object
-	 */
-	protected void computeMxx(double[] c, Builtin builtin) {
+	public double computeMxx(double c, Builtin builtin) {
 		if(_zeros) {
-			c[0] = builtin.execute(c[0], 0);
+			c = builtin.execute(c, 0);
 		}
-		c[0] = _dict.aggregate(c[0], builtin);
+		return _dict.aggregate(c, builtin);
 	}
 
 	/**
@@ -434,7 +436,7 @@
 			Builtin builtin = (Builtin) op.aggOp.increOp.fn;
 
 			if(op.indexFn instanceof ReduceAll)
-				computeMxx(c, builtin);
+				c[0] = computeMxx(c[0], builtin);
 			else if(op.indexFn instanceof ReduceCol)
 				computeRowMxx(c, builtin, rl, ru);
 			else if(op.indexFn instanceof ReduceRow)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
new file mode 100644
index 0000000..1b48adc
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.lib;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.BitmapEncoder;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.colgroup.ColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import org.apache.sysds.runtime.compress.colgroup.Dictionary;
+import org.apache.sysds.runtime.compress.utils.ABitmap;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysds.runtime.functionobjects.Equals;
+import org.apache.sysds.runtime.functionobjects.GreaterThan;
+import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
+import org.apache.sysds.runtime.functionobjects.LessThan;
+import org.apache.sysds.runtime.functionobjects.LessThanEquals;
+import org.apache.sysds.runtime.functionobjects.NotEquals;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+/**
+ * This class is used for relational operators that return binary values depending on individual cells values in the
+ * compression. This indicate that the resulting vectors/matrices are amenable to compression since they only contain
+ * two distinct values, true or false.
+ * 
+ */
+public class LibRelationalOp {
+    // private static final Log LOG = LogFactory.getLog(LibRelationalOp.class.getName());
+
+    /** Thread pool matrix Block for materializing decompressed groups. */
+    private static ThreadLocal<MatrixBlock> memPool = new ThreadLocal<MatrixBlock>() {
+        @Override
+        protected MatrixBlock initialValue() {
+            return null;
+        }
+    };
+
+    public static MatrixBlock relationalOperation(ScalarOperator sop, CompressedMatrixBlock m1,
+        CompressedMatrixBlock ret, boolean overlapping) {
+
+        List<ColGroup> colGroups = m1.getColGroups();
+        if(overlapping) {
+            if(sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan ||
+                sop.fn instanceof GreaterThanEquals || sop.fn instanceof Equals || sop.fn instanceof NotEquals)
+                return overlappingRelativeRelationalOperation(sop, m1, ret);
+        }
+        else {
+            List<ColGroup> newColGroups = new ArrayList<>();
+            for(ColGroup grp : colGroups) {
+                newColGroups.add(grp.scalarOperation(sop));
+            }
+            ret.allocateColGroupList(newColGroups);
+            ret.setNonZeros(-1);
+            ret.setOverlapping(false);
+        }
+
+        return ret;
+    }
+
+    private static MatrixBlock overlappingRelativeRelationalOperation(ScalarOperator sop, CompressedMatrixBlock m1,
+        CompressedMatrixBlock ret) {
+
+        List<ColGroup> colGroups = m1.getColGroups();
+        boolean less = ((sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals) &&
+            sop instanceof LeftScalarOperator) ||
+            (sop instanceof RightScalarOperator &&
+                (sop.fn instanceof GreaterThan || sop.fn instanceof GreaterThanEquals));
+        double v = sop.getConstant();
+        // Queue<Pair<Double, ColGroup>> pq = new PriorityQueue<>();
+        MinMaxGroup[] minMax = new MinMaxGroup[colGroups.size()];
+        double maxS = 0.0;
+        double minS = 0.0;
+        Builtin min = Builtin.getBuiltinFnObject(BuiltinCode.MIN);
+        Builtin max = Builtin.getBuiltinFnObject(BuiltinCode.MAX);
+        int id = 0;
+        for(ColGroup grp : colGroups) {
+            double infN = Double.NEGATIVE_INFINITY;
+            double infP = Double.POSITIVE_INFINITY;
+            double minG = grp.computeMxx(infP, min);
+            double maxG = grp.computeMxx(infN, max);
+            minS += minG;
+            maxS += maxG;
+            minMax[id++] = new MinMaxGroup(minG, maxG, grp);
+        }
+
+        // Shortcut:
+        // If we know worst case min and worst case max and the values to compare to in all cases is
+        // less then or greater than worst then we can return a full matrix with either 1 or 0.
+
+        if(v < minS || v > maxS) {
+            if(sop.fn instanceof Equals) {
+                return makeConstZero(ret);
+            }
+            else if(sop.fn instanceof NotEquals) {
+                return makeConstOne(ret);
+            }
+            else if(less) {
+                if(v < minS || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v <= minS))
+                    return makeConstOne(ret);
+                else
+                    return makeConstZero(ret);
+
+            }
+            else {
+                if(v > minS || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v >= minS))
+                    return makeConstOne(ret);
+                else
+                    return makeConstZero(ret);
+            }
+        }
+        else {
+            return processNonConstant(sop, ret, minMax, minS, maxS, less);
+        }
+
+    }
+
+    private static MatrixBlock makeConstOne(CompressedMatrixBlock ret) {
+        List<ColGroup> newColGroups = new ArrayList<>();
+        int[] colIndexes = new int[ret.getNumColumns()];
+        for(int i = 0; i < colIndexes.length; i++) {
+            colIndexes[i] = i;
+        }
+        double[] values = new double[ret.getNumColumns()];
+        Arrays.fill(values, 1);
+
+        newColGroups.add(new ColGroupConst(colIndexes, ret.getNumRows(), new Dictionary(values)));
+        ret.allocateColGroupList(newColGroups);
+        ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
+        ret.setOverlapping(false);
+        return ret;
+    }
+
+    private static MatrixBlock makeConstZero(CompressedMatrixBlock ret) {
+        MatrixBlock sb = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), true, 0);
+        return sb;
+    }
+
+    private static MatrixBlock processNonConstant(ScalarOperator sop, CompressedMatrixBlock ret, MinMaxGroup[] minMax,
+        double minS, double maxS, boolean less) {
+
+        BitSet res = new BitSet(ret.getNumColumns() * ret.getNumRows());
+        int k = OptimizerUtils.getConstrainedNumThreads(-1);
+        int outRows = ret.getNumRows();
+
+        if(k == 1) {
+            final int b = CompressionSettings.BITMAP_BLOCK_SZ / ret.getNumColumns();
+            final int blkz = (outRows < b) ? outRows : b;
+
+            MatrixBlock tmp = new MatrixBlock(blkz, ret.getNumColumns(), false, -1).allocateBlock();
+            for(int i = 0; i * blkz < outRows; i++) {
+
+                // LOG.error(mmg.g.getClass());
+                for(MinMaxGroup mmg : minMax) {
+                    mmg.g.decompressToBlock(tmp, i * blkz, Math.min((i + 1) * blkz, mmg.g.getNumRows()), 0, mmg.values);
+                    // minS -= mmg.min;
+                    // maxS -= mmg.max;
+                }
+                for(int row = 0; row < blkz && row < ret.getNumRows() - i * blkz; row++) {
+                    int off = (row + i * blkz) * ret.getNumColumns();
+                    for(int col = 0; col < ret.getNumColumns(); col++, off++) {
+                        if(sop.executeScalar(tmp.quickGetValue(row, col)) != 0.0)
+                            res.set(off);
+                    }
+                }
+                tmp.reset();
+            }
+        }
+        else {
+            final int blkz = 65536 / ret.getNumColumns();
+            ExecutorService pool = CommonThreadPool.get(k);
+            ArrayList<RelationalTask> tasks = new ArrayList<>();
+            try {
+                for(int i = 0; i * blkz < outRows; i++) {
+                    RelationalTask rt = new RelationalTask(minMax, i, blkz, res, ret.getNumRows(), ret.getNumColumns(),
+                        sop);
+                    tasks.add(rt);
+                }
+                List<Future<Object>> futures = pool.invokeAll(tasks);
+                pool.shutdown();
+                for(Future<Object> f : futures)
+                    f.get();
+                memPool.remove();
+            }
+            catch(InterruptedException | ExecutionException e) {
+                throw new DMLRuntimeException(e);
+            }
+        }
+
+        int[] colIndexes = new int[ret.getNumColumns()];
+        for(int i = 0; i < colIndexes.length; i++) {
+            colIndexes[i] = i;
+        }
+        CompressionSettings cs = new CompressionSettingsBuilder().setTransposeInput(false).create();
+        ABitmap bm = BitmapEncoder.extractBitmap(colIndexes, ret.getNumRows(), res, cs);
+
+        ColGroup resGroup = ColGroupFactory.compress(colIndexes, ret.getNumRows(), bm, CompressionType.DDC, cs, null);
+        List<ColGroup> newColGroups = new ArrayList<>();
+        newColGroups.add(resGroup);
+        ret.allocateColGroupList(newColGroups);
+        ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
+        ret.setOverlapping(false);
+        return ret;
+    }
+
+    protected static class MinMaxGroup {
+        double min;
+        double max;
+        ColGroup g;
+        double[] values;
+
+        public MinMaxGroup(double min, double max, ColGroup g) {
+            this.min = min;
+            this.max = max;
+            this.g = g;
+            this.values = g.getValues();
+        }
+    }
+
+    private static class RelationalTask implements Callable<Object> {
+        private final MinMaxGroup[] _minMax;
+        private final int _i;
+        private final int _blkz;
+        private final BitSet _res;
+        private final int _rows;
+        private final int _cols;
+        private final ScalarOperator _sop;
+
+        protected RelationalTask(MinMaxGroup[] minMax, int i, int blkz, BitSet res, int rows, int cols,
+            ScalarOperator sop) {
+            _minMax = minMax;
+            _i = i;
+            _blkz = blkz;
+            _res = res;
+            _rows = rows;
+            _cols = cols;
+            _sop = sop;
+        }
+
+        @Override
+        public Object call() {
+            MatrixBlock tmp = memPool.get();
+            if(tmp == null) {
+                memPool.set(new MatrixBlock(_blkz, _cols, false, -1).allocateBlock());
+                tmp = memPool.get();
+            }
+            else {
+                tmp = memPool.get();
+                tmp.reset(_blkz, _cols, false, -1);
+            }
+
+            for(MinMaxGroup mmg : _minMax) {
+                mmg.g.decompressToBlock(tmp, _i * _blkz, Math.min((_i + 1) * _blkz, mmg.g.getNumRows()), 0, mmg.values);
+                // minS -= mmg.min;
+                // maxS -= mmg.max;
+            }
+            for(int row = 0; row < _blkz && row < _rows - _i * _blkz; row++) {
+                int off = (row + _i * _blkz) * _cols;
+                for(int col = 0; col < _cols; col++, off++) {
+                    if(_sop.executeScalar(tmp.quickGetValue(row, col)) != 0.0)
+                        _res.set(off);
+                }
+            }
+
+            return null;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index a92f4dd..89bf691 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -607,6 +607,8 @@
 	 */
 	public static ScalarOperator parseScalarBinaryOperator(String opcode, boolean arg1IsScalar, double constant)
 	{
+		// TODO add Multithreaded threads to Scalar operations.
+
 		//commutative operators
 		if ( opcode.equalsIgnoreCase("+") ){ 
 			return new RightScalarOperator(Plus.getPlusFnObject(), constant); 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index e26ec03..83079dd 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -36,7 +36,9 @@
 import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.compress.CompressionStatistics;
 import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
+import org.apache.sysds.runtime.functionobjects.Equals;
 import org.apache.sysds.runtime.functionobjects.GreaterThan;
+import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
 import org.apache.sysds.runtime.functionobjects.LessThanEquals;
 import org.apache.sysds.runtime.functionobjects.Minus;
 import org.apache.sysds.runtime.functionobjects.Multiply;
@@ -84,8 +86,11 @@
 		// ValueRange.BYTE
 	};
 
-	protected static OverLapping[] overLapping = new OverLapping[] {OverLapping.COL, OverLapping.MATRIX,
-		OverLapping.NONE, OverLapping.MATRIX_PLUS, OverLapping.MATRIX_MULT_NEGATIVE};
+	protected static OverLapping[] overLapping = new OverLapping[] {OverLapping.COL,
+		// OverLapping.MATRIX,
+		OverLapping.NONE, OverLapping.MATRIX_PLUS,
+		// OverLapping.MATRIX_MULT_NEGATIVE
+	};
 
 	private static final int compressionSeed = 7;
 
@@ -535,8 +540,7 @@
 				}
 				else {
 					if(rows > 50000) {
-						TestUtils
-							.compareMatricesPercentageDistance(d1, d2, 0.99, 0.999, this.toString());
+						TestUtils.compareMatricesPercentageDistance(d1, d2, 0.99, 0.999, this.toString());
 					}
 					else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE ||
 						overlappingType == OverLapping.MATRIX_PLUS || overlappingType == OverLapping.MATRIX ||
@@ -597,6 +601,13 @@
 	}
 
 	@Test
+	public void testScalarRightOpEquals() {
+		double addValue = 1.0;
+		ScalarOperator sop = new RightScalarOperator(Equals.getEqualsFnObject(), addValue);
+		testScalarOperations(sop, lossyTolerance + 0.1);
+	}
+
+	@Test
 	public void testScalarRightOpPower2() {
 		double addValue = 2;
 		ScalarOperator sop = new RightScalarOperator(Power2.getPower2FnObject(), addValue);
@@ -639,12 +650,40 @@
 	}
 
 	@Test
+	public void testScalarLeftOpLessSmallValue() {
+		double addValue = -1000000.11;
+		ScalarOperator sop = new LeftScalarOperator(LessThanEquals.getLessThanEqualsFnObject(), addValue);
+		testScalarOperations(sop, lossyTolerance + 0.1);
+	}
+
+	@Test
+	public void testScalarLeftOpGreaterThanEqualsSmallValue() {
+		double addValue = -1001310000.11;
+		ScalarOperator sop = new LeftScalarOperator(GreaterThanEquals.getGreaterThanEqualsFnObject(), addValue);
+		testScalarOperations(sop, lossyTolerance + 0.1);
+	}
+
+	@Test
+	public void testScalarLeftOpGreaterThanLargeValue() {
+		double addValue = 10132400000.11;
+		ScalarOperator sop = new LeftScalarOperator(GreaterThan.getGreaterThanFnObject(), addValue);
+		testScalarOperations(sop, lossyTolerance + 0.1);
+	}
+
+	@Test
 	public void testScalarLeftOpGreater() {
 		double addValue = 0.11;
 		ScalarOperator sop = new LeftScalarOperator(GreaterThan.getGreaterThanFnObject(), addValue);
 		testScalarOperations(sop, lossyTolerance + 0.1);
 	}
 
+	@Test
+	public void testScalarLeftOpEqual() {
+		double addValue = 1.0;
+		ScalarOperator sop = new LeftScalarOperator(Equals.getEqualsFnObject(), addValue);
+		testScalarOperations(sop, lossyTolerance + 0.1);
+	}
+
 	// @Test
 	// This test does not make sense to execute... since the result of left power always is 4.
 	// Furthermore it does not work consistently in our normal matrix blocks ... and should never be used.