[SYSTEMDS-2741] CLA overlaping unaryAggregates

This commit adds overlapping unary aggregates to the operations supported.
It is implemented by decompressing parts of the matrix, to then aggregate.
When the aggregation operations are not supported in overlapping, for
example min, max, sqSum
diff --git a/pom.xml b/pom.xml
index 5433f3c..4027916 100644
--- a/pom.xml
+++ b/pom.xml
@@ -280,7 +280,7 @@
 					<reuseForks>false</reuseForks>
 					<reportFormat>brief</reportFormat>
 					<trimStackTrace>true</trimStackTrace>
-					<!-- <rerunFailingTestsCount>2</rerunFailingTestsCount> -->
+					<rerunFailingTestsCount>2</rerunFailingTestsCount>
 				</configuration>
 			</plugin>
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index ebe1e0d..660c28e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -51,6 +51,7 @@
 import org.apache.sysds.runtime.compress.colgroup.DenseRowIterator;
 import org.apache.sysds.runtime.compress.colgroup.SparseRowIterator;
 import org.apache.sysds.runtime.compress.lib.LibBinaryCellOp;
+import org.apache.sysds.runtime.compress.lib.LibCompAgg;
 import org.apache.sysds.runtime.compress.lib.LibLeftMultBy;
 import org.apache.sysds.runtime.compress.lib.LibRelationalOp;
 import org.apache.sysds.runtime.compress.lib.LibRightMultBy;
@@ -65,7 +66,6 @@
 import org.apache.sysds.runtime.functionobjects.Equals;
 import org.apache.sysds.runtime.functionobjects.GreaterThan;
 import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
-import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.functionobjects.KahanPlusSq;
 import org.apache.sysds.runtime.functionobjects.LessThan;
@@ -77,10 +77,6 @@
 import org.apache.sysds.runtime.functionobjects.NotEquals;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.functionobjects.PlusMultiply;
-import org.apache.sysds.runtime.functionobjects.ReduceAll;
-import org.apache.sysds.runtime.functionobjects.ReduceCol;
-import org.apache.sysds.runtime.functionobjects.ReduceRow;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.LibMatrixBincell;
 import org.apache.sysds.runtime.matrix.data.LibMatrixBincell.BinaryAccessType;
@@ -98,9 +94,6 @@
 	private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
 	private static final long serialVersionUID = 7319372019143154058L;
 
-	/** Threshold for when to parallelize the aggregation functions. */
-	private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024; // 8MB
-
 	/**
 	 * Constructor for building an empty Compressed Matrix block object.
 	 * 
@@ -541,9 +534,7 @@
 
 		// create output matrix block
 		if(right) {
-			that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that;
-			return ret = LibRightMultBy
-				.rightMultByMatrix(_colGroups, that, ret, op.getNumThreads(), getMaxNumValues(), true);
+			return LibRightMultBy.rightMultByMatrix(_colGroups, that, ret, op.getNumThreads(), getMaxNumValues(), true);
 		}
 		else {
 			return LibLeftMultBy.leftMultByMatrix(_colGroups,
@@ -562,6 +553,12 @@
 
 	@Override
 	public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result, int blen,
+		MatrixIndexes indexesIn) {
+		return aggregateUnaryOperations(op, result, blen, indexesIn, false);
+	}
+
+	@Override
+	public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result, int blen,
 		MatrixIndexes indexesIn, boolean inCP) {
 
 		// check for supported operations
@@ -573,15 +570,6 @@
 			throw new NotImplementedException("Unary aggregate " + op.aggOp.increOp.fn + " not supported yet.");
 		}
 
-		if(overlappingColGroups &&
-			(op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin &&
-				(((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
-					((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
-			LOG.info("Unsupported overlapping aggregate: " + op.aggOp.increOp.fn);
-			MatrixBlock m1d = decompress(op.getNumThreads());
-			return m1d.aggregateUnaryOperations(op, result, blen, indexesIn, inCP);
-		}
-
 		// prepare output dimensions
 		CellIndex tempCellIndex = new CellIndex(-1, -1);
 		op.indexFn.computeDimension(rlen, clen, tempCellIndex);
@@ -613,154 +601,19 @@
 		MatrixBlock ret = (MatrixBlock) result;
 		ret.allocateDenseBlock();
 
-		if(op.aggOp.increOp.fn instanceof Builtin) {
-			Double val = null;
-			switch(((Builtin) op.aggOp.increOp.fn).getBuiltinCode()) {
-				case MAX:
-					val = Double.NEGATIVE_INFINITY;
-					break;
-				case MIN:
-					val = Double.POSITIVE_INFINITY;
-					break;
-				default:
-					break;
-			}
-			if(val != null) {
-				ret.getDenseBlock().set(val);
-			}
+		if(overlappingColGroups &&
+			(op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin &&
+				(((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+					((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+			return LibCompAgg.aggregateUnaryOverlapping(this, ret, op, blen, indexesIn, inCP);
 		}
 
-		// core unary aggregate
-		if(op.getNumThreads() > 1 && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD) {
-			// multi-threaded execution of all groups
-			ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(_colGroups,
-				(op.indexFn instanceof ReduceCol) ? 1 : op.getNumThreads(),
-				false);
-			ColGroupUncompressed uc = getUncompressedColGroup();
-
-			try {
-				// compute uncompressed column group in parallel (otherwise bottleneck)
-				if(uc != null)
-					uc.unaryAggregateOperations(op, ret);
-				// compute all compressed column groups
-				ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
-				ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
-				if(op.indexFn instanceof ReduceCol && grpParts.length > 0) {
-					final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
-					int blklen = (int) Math.ceil((double) rlen / op.getNumThreads());
-					blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
-					for(int i = 0; i < op.getNumThreads() & i * blklen < rlen; i++) {
-						tasks.add(
-							new UnaryAggregateTask(grpParts[0], ret, i * blklen, Math.min((i + 1) * blklen, rlen), op));
-
-					}
-				}
-				else
-					for(ArrayList<ColGroup> grp : grpParts) {
-						if(grp != null)
-							tasks.add(new UnaryAggregateTask(grp, ret, 0, rlen, op));
-					}
-				List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);
-				pool.shutdown();
-
-				// aggregate partial results
-				if(op.indexFn instanceof ReduceAll) {
-					if(op.aggOp.increOp.fn instanceof KahanFunction) {
-						KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
-						for(Future<MatrixBlock> rtask : rtasks) {
-							double tmp = rtask.get().quickGetValue(0, 0);
-							((KahanFunction) op.aggOp.increOp.fn).execute2(kbuff, tmp);
-						}
-						ret.quickSetValue(0, 0, kbuff._sum);
-					}
-					else if(op.aggOp.increOp.fn instanceof Mean) {
-						double val = ret.quickGetValue(0, 0);
-						for(Future<MatrixBlock> rtask : rtasks) {
-							double tmp = rtask.get().quickGetValue(0, 0);
-							val = val + tmp;
-						}
-						ret.quickSetValue(0, 0, val);
-					}
-					else {
-						double val = ret.quickGetValue(0, 0);
-						for(Future<MatrixBlock> rtask : rtasks) {
-							double tmp = rtask.get().quickGetValue(0, 0);
-							val = op.aggOp.increOp.fn.execute(val, tmp);
-						}
-						ret.quickSetValue(0, 0, val);
-					}
-				}
-			}
-			catch(InterruptedException | ExecutionException e) {
-				LOG.fatal("UnaryAggregate Exception: " + e.getMessage(), e);
-				throw new DMLRuntimeException(e);
-			}
-		}
-		else {
-			if(_colGroups != null) {
-
-				for(ColGroup grp : _colGroups)
-					if(grp instanceof ColGroupUncompressed)
-						((ColGroupUncompressed) grp).unaryAggregateOperations(op, ret);
-				aggregateUnaryOperations(op, _colGroups, ret, 0, rlen);
-			}
-		}
-
-		// special handling zeros for rowmins/rowmax
-		if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin) {
-			int[] rnnz = new int[rlen];
-			for(ColGroup grp : _colGroups)
-				grp.countNonZerosPerRow(rnnz, 0, rlen);
-			Builtin builtin = (Builtin) op.aggOp.increOp.fn;
-			for(int i = 0; i < rlen; i++)
-				if(rnnz[i] < clen)
-					ret.quickSetValue(i, 0, builtin.execute(ret.quickGetValue(i, 0), 0));
-		}
-
-		// special handling of mean
-		if(op.aggOp.increOp.fn instanceof Mean) {
-			if(op.indexFn instanceof ReduceAll) {
-				ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) / (getNumColumns() * getNumRows()));
-			}
-			else if(op.indexFn instanceof ReduceCol) {
-				for(int i = 0; i < getNumRows(); i++) {
-					ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) / getNumColumns());
-				}
-			}
-			else if(op.indexFn instanceof ReduceRow)
-				for(int i = 0; i < getNumColumns(); i++) {
-					ret.quickSetValue(0, i, ret.quickGetValue(0, i) / getNumRows());
-				}
-		}
-
-		// drop correction if necessary
-		if(op.aggOp.existsCorrection() && inCP)
-			ret.dropLastRowsOrColumns(op.aggOp.correction);
-
-		// post-processing
-		ret.recomputeNonZeros();
+		ret = LibCompAgg.aggregateUnary(this, ret, op, blen, indexesIn, inCP);
 
 		return ret;
 	}
 
 	@Override
-	public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result, int blen,
-		MatrixIndexes indexesIn) {
-		return aggregateUnaryOperations(op, result, blen, indexesIn, false);
-	}
-
-	private static void aggregateUnaryOperations(AggregateUnaryOperator op, List<ColGroup> groups, MatrixBlock ret,
-		int rl, int ru) {
-
-		// note: UC group never passed into this function
-		double[] c = ret.getDenseBlockValues();
-		for(ColGroup grp : groups)
-			if(grp != null && !(grp instanceof ColGroupUncompressed))
-				grp.unaryAggregateOperations(op, c, rl, ru);
-
-	}
-
-	@Override
 	public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype) {
 		return transposeSelfMatrixMultOperations(out, tstype, 1);
 	}
@@ -794,36 +647,11 @@
 		return out;
 	}
 
-	@SuppressWarnings("unchecked")
-	private static ArrayList<ColGroup>[] createStaticTaskPartitioning(List<ColGroup> colGroups, int k,
-		boolean inclUncompressed) {
-		// special case: single uncompressed col group
-		if(colGroups.size() == 1 && colGroups.get(0) instanceof ColGroupUncompressed) {
-			return new ArrayList[0];
-		}
-
-		// initialize round robin col group distribution
-		// (static task partitioning to reduce mem requirements/final agg)
-		int numTasks = Math.min(k, colGroups.size());
-		ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks];
-		int pos = 0;
-		for(ColGroup grp : colGroups) {
-			if(grpParts[pos] == null)
-				grpParts[pos] = new ArrayList<>();
-			if(inclUncompressed || !(grp instanceof ColGroupUncompressed)) {
-				grpParts[pos].add(grp);
-				pos = (pos == numTasks - 1) ? 0 : pos + 1;
-			}
-		}
-
-		return grpParts;
-	}
-
 	public boolean hasUncompressedColGroup() {
 		return getUncompressedColGroup() != null;
 	}
 
-	private ColGroupUncompressed getUncompressedColGroup() {
+	public ColGroupUncompressed getUncompressedColGroup() {
 		for(ColGroup grp : _colGroups)
 			if(grp instanceof ColGroupUncompressed)
 				return (ColGroupUncompressed) grp;
@@ -854,42 +682,6 @@
 		}
 	}
 
-	private static class UnaryAggregateTask implements Callable<MatrixBlock> {
-		private final List<ColGroup> _groups;
-		private final int _rl;
-		private final int _ru;
-		private final MatrixBlock _ret;
-		private final AggregateUnaryOperator _op;
-
-		protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, int rl, int ru,
-			AggregateUnaryOperator op) {
-			_groups = groups;
-			_op = op;
-			_rl = rl;
-			_ru = ru;
-
-			if(_op.indexFn instanceof ReduceAll) { // sum
-				_ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
-				_ret.allocateDenseBlock();
-				if(_op.aggOp.increOp.fn instanceof Builtin)
-					System.arraycopy(ret.getDenseBlockValues(),
-						0,
-						_ret.getDenseBlockValues(),
-						0,
-						ret.getNumRows() * ret.getNumColumns());
-			}
-			else { // colSums
-				_ret = ret;
-			}
-		}
-
-		@Override
-		public MatrixBlock call() {
-			aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru);
-			return _ret;
-		}
-	}
-
 	private static class DecompressTask implements Callable<Long> {
 		private final List<ColGroup> _colGroups;
 		private final MatrixBlock _ret;
@@ -907,7 +699,6 @@
 		public Long call() {
 
 			// preallocate sparse rows to avoid repeated alloc
-
 			if(_ret.isInSparseFormat()) {
 				int[] rnnz = new int[_ru - _rl];
 				for(ColGroup grp : _colGroups)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index d0629ff..a03a709 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -115,8 +115,8 @@
 
 		for(int i = rl; i < ru; i++, offT++)
 			for(int j = 0; j < ncol; j++) {
-				double v = target.quickGetValue(i, _colIndexes[j]);
-				target.setValue(i, _colIndexes[j], values[j] + v);
+				double v = target.quickGetValue(offT, _colIndexes[j]);
+				target.setValue(offT, _colIndexes[j], values[j] + v);
 			}
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
new file mode 100644
index 0000000..4eb1970
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.lib;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.colgroup.ColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysds.runtime.functionobjects.KahanFunction;
+import org.apache.sysds.runtime.functionobjects.KahanPlus;
+import org.apache.sysds.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysds.runtime.functionobjects.Mean;
+import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.functionobjects.ReduceAll;
+import org.apache.sysds.runtime.functionobjects.ReduceCol;
+import org.apache.sysds.runtime.functionobjects.ReduceRow;
+import org.apache.sysds.runtime.instructions.cp.KahanObject;
+import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
+import org.apache.sysds.runtime.matrix.data.LibMatrixBincell;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+public class LibCompAgg {
+
+    // private static final Log LOG = LogFactory.getLog(LibCompAgg.class.getName());
+
+    /** Threshold for when to parallelize the aggregation functions. */
+    private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024; // 8MB
+
+    /** Thread pool matrix Block for materializing decompressed groups. */
+    private static ThreadLocal<MatrixBlock> memPool = new ThreadLocal<MatrixBlock>() {
+        @Override
+        protected MatrixBlock initialValue() {
+            return null;
+        }
+    };
+
+    public static MatrixBlock aggregateUnary(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op,
+        int blen, MatrixIndexes indexesIn, boolean inCP) {
+
+        fillStart(ret, op);
+
+        // core unary aggregate
+        if(op.getNumThreads() > 1 && m1.getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD) {
+            // multi-threaded execution of all groups
+            ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(m1.getColGroups(),
+                (op.indexFn instanceof ReduceCol) ? 1 : op.getNumThreads(),
+                false);
+
+            ColGroupUncompressed uc = m1.getUncompressedColGroup();
+
+            try {
+                // compute uncompressed column group in parallel (otherwise bottleneck)
+                if(uc != null)
+                    uc.unaryAggregateOperations(op, ret);
+                // compute all compressed column groups
+                ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
+                ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
+                if(op.indexFn instanceof ReduceCol && grpParts.length > 0) {
+                    final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
+                    int blklen = (int) Math.ceil((double) m1.getNumRows() / op.getNumThreads());
+                    blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
+                    for(int i = 0; i < op.getNumThreads() & i * blklen < m1.getNumRows(); i++) {
+                        tasks.add(new UnaryAggregateTask(grpParts[0], ret, i * blklen,
+                            Math.min((i + 1) * blklen, m1.getNumRows()), op));
+
+                    }
+                }
+                else
+                    for(ArrayList<ColGroup> grp : grpParts) {
+                        if(grp != null)
+                            tasks.add(new UnaryAggregateTask(grp, ret, 0, m1.getNumRows(), op));
+                    }
+                List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);
+                pool.shutdown();
+
+                // aggregate partial results
+                if(op.indexFn instanceof ReduceAll) {
+                    if(op.aggOp.increOp.fn instanceof KahanFunction) {
+                        KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
+                        for(Future<MatrixBlock> rtask : rtasks) {
+                            double tmp = rtask.get().quickGetValue(0, 0);
+                            ((KahanFunction) op.aggOp.increOp.fn).execute2(kbuff, tmp);
+                        }
+                        ret.quickSetValue(0, 0, kbuff._sum);
+                    }
+                    else if(op.aggOp.increOp.fn instanceof Mean) {
+                        double val = ret.quickGetValue(0, 0);
+                        for(Future<MatrixBlock> rtask : rtasks) {
+                            double tmp = rtask.get().quickGetValue(0, 0);
+                            val = val + tmp;
+                        }
+                        ret.quickSetValue(0, 0, val);
+                    }
+                    else {
+                        double val = ret.quickGetValue(0, 0);
+                        for(Future<MatrixBlock> rtask : rtasks) {
+                            double tmp = rtask.get().quickGetValue(0, 0);
+                            val = op.aggOp.increOp.fn.execute(val, tmp);
+                        }
+                        ret.quickSetValue(0, 0, val);
+                    }
+                }
+            }
+            catch(InterruptedException | ExecutionException e) {
+                throw new DMLRuntimeException(e);
+            }
+        }
+        else {
+            if(m1.getColGroups() != null) {
+
+                for(ColGroup grp : m1.getColGroups())
+                    if(grp instanceof ColGroupUncompressed)
+                        ((ColGroupUncompressed) grp).unaryAggregateOperations(op, ret);
+                aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, m1.getNumRows());
+            }
+        }
+
+        // special handling zeros for rowmins/rowmax
+        if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin) {
+            int[] rnnz = new int[m1.getNumRows()];
+            for(ColGroup grp : m1.getColGroups())
+                grp.countNonZerosPerRow(rnnz, 0, m1.getNumRows());
+            Builtin builtin = (Builtin) op.aggOp.increOp.fn;
+            for(int i = 0; i < m1.getNumRows(); i++)
+                if(rnnz[i] < m1.getNumColumns())
+                    ret.quickSetValue(i, 0, builtin.execute(ret.quickGetValue(i, 0), 0));
+        }
+
+        // special handling of mean
+        if(op.aggOp.increOp.fn instanceof Mean) {
+            if(op.indexFn instanceof ReduceAll) {
+                ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) / (m1.getNumColumns() * m1.getNumRows()));
+            }
+            else if(op.indexFn instanceof ReduceCol) {
+                for(int i = 0; i < m1.getNumRows(); i++) {
+                    ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) / m1.getNumColumns());
+                }
+            }
+            else if(op.indexFn instanceof ReduceRow)
+                for(int i = 0; i < m1.getNumColumns(); i++) {
+                    ret.quickSetValue(0, i, ret.quickGetValue(0, i) / m1.getNumRows());
+                }
+        }
+
+        // drop correction if necessary
+        if(op.aggOp.existsCorrection() && inCP)
+            ret.dropLastRowsOrColumns(op.aggOp.correction);
+
+        ret.recomputeNonZeros();
+        return ret;
+    }
+
+    public static MatrixBlock aggregateUnaryOverlapping(CompressedMatrixBlock m1, MatrixBlock ret,
+        AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean inCP) {
+
+        if(!(op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin &&
+            (((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+                ((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+            throw new DMLRuntimeException("Overlapping aggregates is not valid for op: " + op.aggOp.increOp.fn);
+        }
+
+        fillStart(ret, op);
+
+        try {
+            // compute all compressed column groups
+            ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
+            ArrayList<UnaryAggregateOverlappingTask> tasks = new ArrayList<>();
+            final int blklen = CompressionSettings.BITMAP_BLOCK_SZ / m1.getNumColumns();
+
+            for(int i = 0; i * blklen < m1.getNumRows(); i++) {
+                tasks.add(new UnaryAggregateOverlappingTask(m1.getColGroups(), ret, i * blklen,
+                    Math.min((i + 1) * blklen, m1.getNumRows()), op));
+            }
+
+            List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);
+            pool.shutdown();
+
+            if(op.indexFn instanceof ReduceAll || (ret.getNumColumns() == 1 && ret.getNumRows() == 1)) {
+                if(op.aggOp.increOp.fn instanceof KahanFunction) {
+                    KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
+                    KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+                    for(Future<MatrixBlock> rtask : rtasks) {
+                        double tmp = rtask.get().quickGetValue(0, 0);
+                        kplus.execute2(kbuff, tmp);
+                    }
+                    ret.quickSetValue(0, 0, kbuff._sum);
+                }
+                else {
+                    double val = ret.quickGetValue(0, 0);
+                    for(Future<MatrixBlock> rtask : rtasks) {
+                        double tmp = rtask.get().quickGetValue(0, 0);
+                        val = op.aggOp.increOp.fn.execute(val, tmp);
+                    }
+                    ret.quickSetValue(0, 0, val);
+                }
+
+                ret.recomputeNonZeros();
+            }
+            else if(op.indexFn instanceof ReduceCol) {
+                // LOG.error("Here");
+                long nnz = 0;
+                for(int i = 0; i * blklen < m1.getNumRows(); i++) {
+                    MatrixBlock tmp = rtasks.get(i).get();
+                    for(int row = 0, off = i * blklen; row < tmp.getNumRows(); row++, off++) {
+                        ret.quickSetValue(off, 0, tmp.quickGetValue(row, 0));
+                        nnz += ret.quickGetValue(off, 0) == 0 ? 0 : 1;
+                    }
+                }
+                ret.setNonZeros(nnz);
+            }
+            else {
+                for(Future<MatrixBlock> rtask : rtasks) {
+                    LibMatrixBincell.bincellOp(rtask.get(),
+                        ret,
+                        ret,
+                        (op.aggOp.increOp.fn instanceof KahanFunction) ? new BinaryOperator(
+                            Plus.getPlusFnObject()) : op.aggOp.increOp);
+                }
+            }
+        }
+        catch(InterruptedException | ExecutionException e) {
+            throw new DMLRuntimeException(e);
+        }
+
+        if(op.aggOp.existsCorrection() && inCP)
+            ret.dropLastRowsOrColumns(op.aggOp.correction);
+
+        return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static ArrayList<ColGroup>[] createStaticTaskPartitioning(List<ColGroup> colGroups, int k,
+        boolean inclUncompressed) {
+        // special case: single uncompressed col group
+        if(colGroups.size() == 1 && colGroups.get(0) instanceof ColGroupUncompressed) {
+            return new ArrayList[0];
+        }
+
+        // initialize round robin col group distribution
+        // (static task partitioning to reduce mem requirements/final agg)
+        int numTasks = Math.min(k, colGroups.size());
+        ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks];
+        int pos = 0;
+        for(ColGroup grp : colGroups) {
+            if(grpParts[pos] == null)
+                grpParts[pos] = new ArrayList<>();
+            if(inclUncompressed || !(grp instanceof ColGroupUncompressed)) {
+                grpParts[pos].add(grp);
+                pos = (pos == numTasks - 1) ? 0 : pos + 1;
+            }
+        }
+
+        return grpParts;
+    }
+
+    private static void aggregateUnaryOperations(AggregateUnaryOperator op, List<ColGroup> groups, MatrixBlock ret,
+        int rl, int ru) {
+
+        // note: UC group never passed into this function
+        double[] c = ret.getDenseBlockValues();
+        for(ColGroup grp : groups)
+            if(grp != null && !(grp instanceof ColGroupUncompressed))
+                grp.unaryAggregateOperations(op, c, rl, ru);
+
+    }
+
+    private static class UnaryAggregateTask implements Callable<MatrixBlock> {
+        private final List<ColGroup> _groups;
+        private final int _rl;
+        private final int _ru;
+        private final MatrixBlock _ret;
+        private final AggregateUnaryOperator _op;
+
+        protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, int rl, int ru,
+            AggregateUnaryOperator op) {
+            _groups = groups;
+            _op = op;
+            _rl = rl;
+            _ru = ru;
+
+            if(_op.indexFn instanceof ReduceAll) { // sum
+                _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
+                _ret.allocateDenseBlock();
+                if(_op.aggOp.increOp.fn instanceof Builtin)
+                    System.arraycopy(ret.getDenseBlockValues(),
+                        0,
+                        _ret.getDenseBlockValues(),
+                        0,
+                        ret.getNumRows() * ret.getNumColumns());
+            }
+            else { // colSums
+                _ret = ret;
+            }
+        }
+
+        @Override
+        public MatrixBlock call() {
+            aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru);
+            return _ret;
+        }
+    }
+
+    private static class UnaryAggregateOverlappingTask implements Callable<MatrixBlock> {
+        private final List<ColGroup> _groups;
+        private final int _rl;
+        private final int _ru;
+        private final MatrixBlock _ret;
+        private final AggregateUnaryOperator _op;
+
+        protected UnaryAggregateOverlappingTask(List<ColGroup> groups, MatrixBlock ret, int rl, int ru,
+            AggregateUnaryOperator op) {
+            _groups = groups;
+            _op = op;
+            _rl = rl;
+            _ru = ru;
+            if(_op.indexFn instanceof ReduceAll) {
+                _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
+                _ret.allocateDenseBlock();
+            }
+            else if(_op.indexFn instanceof ReduceCol) {
+                _ret = new MatrixBlock(ru - rl, ret.getNumColumns(), false);
+                _ret.allocateDenseBlock();
+            }
+            else {
+                _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
+                _ret.allocateDenseBlock();
+            }
+            if(_op.aggOp.increOp.fn instanceof Builtin) {
+                System.arraycopy(ret
+                    .getDenseBlockValues(), 0, _ret.getDenseBlockValues(), 0, _ret.getDenseBlockValues().length);
+            }
+
+        }
+
+        @Override
+        public MatrixBlock call() {
+            MatrixBlock tmp = memPool.get();
+            if(tmp == null) {
+                memPool.set(new MatrixBlock(_ru - _rl, _groups.get(0).getNumCols(), false, -1).allocateBlock());
+                tmp = memPool.get();
+            }
+            else {
+                tmp = memPool.get();
+                tmp.reset(_ru - _rl, _groups.get(0).getNumCols(), false, -1);
+            }
+
+            for(ColGroup g : _groups) {
+                g.decompressToBlock(tmp, _rl, _ru, 0, g.getValues());
+            }
+
+            LibMatrixAgg.aggregateUnaryMatrix(tmp, _ret, _op);
+            return _ret;
+        }
+    }
+
+    private static void fillStart(MatrixBlock ret, AggregateUnaryOperator op) {
+        if(op.aggOp.increOp.fn instanceof Builtin) {
+            Double val = null;
+            switch(((Builtin) op.aggOp.increOp.fn).getBuiltinCode()) {
+                case MAX:
+                    val = Double.NEGATIVE_INFINITY;
+                    break;
+                case MIN:
+                    val = Double.POSITIVE_INFINITY;
+                    break;
+                default:
+                    break;
+            }
+            if(val != null) {
+                ret.getDenseBlock().set(val);
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
index fd5a084..aa198d4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
@@ -54,6 +54,9 @@
 			ret = new MatrixBlock(rl, cl, false, rl * cl);
 		else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated()))
 			ret.reset(rl, cl, false, rl * cl);
+		if(that instanceof CompressedMatrixBlock){
+			LOG.info("Decompression Left side Matrix (Should not really happen)");
+		}
 		that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that;
 
 		// if(that.getNumRows() == 1) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
index 1b48adc..db87310 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
@@ -202,7 +202,7 @@
             }
         }
         else {
-            final int blkz = 65536 / ret.getNumColumns();
+            final int blkz = CompressionSettings.BITMAP_BLOCK_SZ / ret.getNumColumns();
             ExecutorService pool = CommonThreadPool.get(k);
             ArrayList<RelationalTask> tasks = new ArrayList<>();
             try {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
index 761e5a4..a6b4219 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
@@ -60,6 +60,11 @@
 	public static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
 		Pair<Integer, int[]> v, boolean allowOverlap) {
 
+		if(that instanceof CompressedMatrixBlock){
+			LOG.info("Decompression Right matrix");
+		}
+		that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that;
+
 		boolean containsUncompressable = false;
 		int distinctCount = 0;
 		for(ColGroup g : colGroups) {
diff --git a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
index df15499..8cce3d0 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
@@ -174,6 +174,12 @@
 			// compare result with input
 			double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
 			double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
+			// for(double[] row : d1) {
+			// 	LOG.error(Arrays.toString(row));
+			// }
+			// for(double[] row : d2) {
+			// 	LOG.error(Arrays.toString(row));
+			// }
 			int dim1 = (aggType == AggType.ROWSUMS || aggType == AggType.ROWSUMSSQ || aggType == AggType.ROWMAXS ||
 				aggType == AggType.ROWMINS || aggType == AggType.ROWMEAN) ? rows : 1;
 			int dim2 = (aggType == AggType.COLSUMS || aggType == AggType.COLSUMSSQ || aggType == AggType.COLMAXS ||
@@ -211,9 +217,9 @@
 			else {
 				if(aggType == AggType.ROWMEAN)
 					TestUtils.compareMatrices(d1, d2, 0.0001, css);
-				else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE ||
+				else if(overlappingType == OverLapping.COL || overlappingType == OverLapping.MATRIX_MULT_NEGATIVE ||
 					overlappingType == OverLapping.MATRIX_PLUS || overlappingType == OverLapping.MATRIX)
-					TestUtils.compareMatricesBitAvgDistance(d1, d2, 8192, 128, css);
+					TestUtils.compareMatricesBitAvgDistance(d1, d2, 32768, 128, css);
 				else
 					TestUtils.compareMatricesBitAvgDistance(d1, d2, 2048, 128, css);
 			}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
index 76fe85d..f0cb1ea 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
@@ -75,7 +75,7 @@
 					else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE ||
 						overlappingType == OverLapping.MATRIX_PLUS || overlappingType == OverLapping.MATRIX ||
 						overlappingType == OverLapping.COL)
-						TestUtils.compareScalarBitsJUnit(ulaVal, claVal, 8192);
+						TestUtils.compareScalarBitsJUnit(ulaVal, claVal, 32768);
 					else
 						TestUtils.compareScalarBitsJUnit(ulaVal, claVal, 0); // Should be exactly same value
 
@@ -112,7 +112,7 @@
 
 			else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE || overlappingType == OverLapping.MATRIX_PLUS ||
 				overlappingType == OverLapping.MATRIX || overlappingType == OverLapping.COL)
-				TestUtils.compareMatricesBitAvgDistance(d1, d2, 8192, 128, this.toString());
+				TestUtils.compareMatricesBitAvgDistance(d1, d2, 32768, 128, this.toString());
 			else
 				TestUtils.compareMatricesBitAvgDistance(d1, d2, 0, 1, "Test Append Matrix");
 
@@ -188,7 +188,7 @@
 				TestUtils.compareMatrices(d1, d2, lossyTolerance, this.toString());
 			else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE || overlappingType == OverLapping.MATRIX_PLUS ||
 				overlappingType == OverLapping.MATRIX || overlappingType == OverLapping.COL)
-				TestUtils.compareMatricesBitAvgDistance(d1, d2, 8192, 128, this.toString());
+				TestUtils.compareMatricesBitAvgDistance(d1, d2, 32768, 128, this.toString());
 			else
 				TestUtils.compareMatricesBitAvgDistance(d1, d2, 0, 0, this.toString());
 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index 83079dd..d49a60a 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -77,7 +77,8 @@
 	protected static ValueType[] usedValueTypes = new ValueType[] {
 		// ValueType.RAND,
 		// ValueType.CONST,
-		ValueType.RAND_ROUND, ValueType.OLE_COMPRESSIBLE,
+		ValueType.RAND_ROUND, 
+		ValueType.OLE_COMPRESSIBLE,
 		// ValueType.RLE_COMPRESSIBLE,
 	};
 
@@ -86,9 +87,11 @@
 		// ValueRange.BYTE
 	};
 
-	protected static OverLapping[] overLapping = new OverLapping[] {OverLapping.COL,
+	protected static OverLapping[] overLapping = new OverLapping[] {
+		OverLapping.COL,
 		// OverLapping.MATRIX,
-		OverLapping.NONE, OverLapping.MATRIX_PLUS,
+		OverLapping.NONE, 
+		OverLapping.MATRIX_PLUS,
 		// OverLapping.MATRIX_MULT_NEGATIVE
 	};
 
@@ -134,7 +137,7 @@
 
 	protected static MatrixTypology[] usedMatrixTypology = new MatrixTypology[] { // Selected Matrix Types
 		// MatrixTypology.SMALL, MatrixTypology.FEW_COL,
-		// MatrixTypology.FEW_ROW,
+		MatrixTypology.FEW_ROW,
 		// MatrixTypology.LARGE,
 		// MatrixTypology.SINGLE_COL,
 		// MatrixTypology.SINGLE_ROW,
@@ -258,7 +261,7 @@
 				TestUtils.compareMatrices(org, deCompressed, lossyTolerance, this.toString());
 			else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE || overlappingType == OverLapping.MATRIX_PLUS ||
 				overlappingType == OverLapping.MATRIX || overlappingType == OverLapping.COL)
-				TestUtils.compareMatricesBitAvgDistance(org, deCompressed, 8192, 124, this.toString());
+				TestUtils.compareMatricesBitAvgDistance(org, deCompressed, 32768, 124, this.toString());
 			else
 				TestUtils.compareMatricesBitAvgDistance(org, deCompressed, 5, 1, this.toString());
 
@@ -499,13 +502,13 @@
 			}
 			else {
 				if(rows > 65000)
-					TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.99, compressionSettings.toString());
+					TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.99, this.toString());
 				else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE ||
 					overlappingType == OverLapping.MATRIX_PLUS || overlappingType == OverLapping.MATRIX ||
 					overlappingType == OverLapping.COL)
 					TestUtils.compareMatricesBitAvgDistance(d1, d2, 1600000, 1000, this.toString());
 				else
-					TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, compressionSettings.toString());
+					TestUtils.compareMatricesBitAvgDistance(d1, d2, 32768, 500, this.toString());
 
 			}
 		}