[SYSTEMDS-2787] Compression Steps Reorganization

This commit contains various changes (some massive ones).
The biggest change is the ordering of compression steps, of which
now we classify first on a sample of the data. Since this was experimented
to be 10-30% faster. Furthermore this allows us to try compression at a
lower cost if the compression is not valid to perform.

Overall Compression time for covtype went from
 - ~1.0 to 0.36 sec (including read from disk) 0.11 sec compression

Furthermore now unlike before the transpose is heuristically chosen, Since
it is more efficient not to transpose the matrix for compression in some
cases.

- Compressed Sparse matrix multiplication fix
- modified matrix multiplication to push down information of
  transposing to the ba+* op. to allow not decompressing the matrix.
- Configuration option of enabling and disabling overlapping compression.
- decompress row section direct access to the matrix block not using
  quick set/get.
- adding safe boolean to decompress to specify if management of
  nnz should be done. This allows the decompression of intermediates at
  near half the computation cost.
- Add configuration for sampling ratio default 0.01 but with a minimum
  sample size of 2000 elements.
- DML Config settings for Cocode-Compression method default to COST
- add support for right sparse matrix multiplication with overlapping
  output. Further improvements are on the way.
- Compression statistics are added when statistics and compression is
  enabled
- Readers for extracting bitmaps are optimized for either transposed or
  untransposed matrices giving 5-15% improved performance.
- Hashmaps are modified to improve insertion time since previously they
  would hash values twice 10% improved performance. furthermore the
  default sizes are modified to start smaller.
- Additional tests for multipication to cover different edge cases.
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 4fb32e7..3751dec 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -70,6 +70,10 @@
 	public static final String COMPRESSED_LINALG    = "sysds.compressed.linalg"; //auto, cost, true, false
 	public static final String COMPRESSED_LOSSY     = "sysds.compressed.lossy";
 	public static final String COMPRESSED_VALID_COMPRESSIONS = "sysds.compressed.valid.compressions";
+	public static final String COMPRESSED_OVERLAPPING = "sysds.compressed.overlapping"; // true, false
+	public static final String COMPRESSED_SAMPLING_RATIO = "sysds.compressed.sampling.ratio"; // 0.1
+	public static final String COMPRESSED_COCODE    = "sysds.compressed.cocode"; // COST
+	public static final String COMPRESSED_TRANSPOSE = "sysds.compressed.transpose"; // true, false, auto.
 	public static final String NATIVE_BLAS          = "sysds.native.blas";
 	public static final String NATIVE_BLAS_DIR      = "sysds.native.blas.directory";
 	public static final String CODEGEN              = "sysds.codegen.enabled"; //boolean
@@ -121,6 +125,10 @@
 		_defaultVals.put(COMPRESSED_LINALG,      Compression.CompressConfig.AUTO.name() );
 		_defaultVals.put(COMPRESSED_LOSSY,       "false" );
 		_defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "DDC,OLE,RLE");
+		_defaultVals.put(COMPRESSED_OVERLAPPING, "false" );
+		_defaultVals.put(COMPRESSED_SAMPLING_RATIO, "0.01");
+		_defaultVals.put(COMPRESSED_COCODE,      "COST");
+		_defaultVals.put(COMPRESSED_TRANSPOSE,   "auto");
 		_defaultVals.put(CODEGEN,                "false" );
 		_defaultVals.put(CODEGEN_API,		     GeneratorAPI.JAVA.name() );
 		_defaultVals.put(CODEGEN_COMPILER,       CompilerType.AUTO.name() );
@@ -385,7 +393,8 @@
 		String[] tmpConfig = new String[] { 
 			LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL, DEFAULT_BLOCK_SIZE,
 			CP_PARALLEL_OPS, CP_PARALLEL_IO, NATIVE_BLAS, NATIVE_BLAS_DIR,
-			COMPRESSED_LINALG, COMPRESSED_LOSSY, COMPRESSED_VALID_COMPRESSIONS,
+			COMPRESSED_LINALG, COMPRESSED_LOSSY, COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING,
+			COMPRESSED_SAMPLING_RATIO, COMPRESSED_COCODE, COMPRESSED_TRANSPOSE,
 			CODEGEN, CODEGEN_API, CODEGEN_COMPILER, CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS,
 			STATS_MAX_WRAP_LEN, PRINT_GPU_MEMORY_INFO,
 			AVAILABLE_GPUS, SYNCHRONIZE_GPU, EAGER_CUDA_FREE, FLOATING_POINT_PRECISION, GPU_EVICTION_POLICY, 
diff --git a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
index a04d267..5dcc5ee 100644
--- a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
@@ -27,6 +27,7 @@
 import org.apache.sysds.common.Types.OpOp2;
 import org.apache.sysds.common.Types.ReOrgOp;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.rewrite.HopRewriteUtils;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.lops.LopProperties.ExecType;
@@ -598,7 +599,7 @@
 	private void constructCPLopsMM(ExecType et) 
 	{
 		Lop matmultCP = null;
-		
+		String cla = ConfigurationManager.getDMLConfig().getTextValue("sysds.compressed.linalg");
 		if (et == ExecType.GPU) {
 			Hop h1 = getInput().get(0);
 			Hop h2 = getInput().get(1);
@@ -615,6 +616,18 @@
 			matmultCP = new MatMultCP(left, right, getDataType(), getValueType(), et, leftTrans, rightTrans);
 			setOutputDimensions(matmultCP);
 		}
+		else if (cla.equals("true") || cla.equals("cost")){
+			Hop h1 = getInput().get(0);
+			Hop h2 = getInput().get(1);
+			int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
+			boolean leftTrans = HopRewriteUtils.isTransposeOperation(h1);
+			boolean rightTrans =  HopRewriteUtils.isTransposeOperation(h2);
+			Lop left = !leftTrans ? h1.constructLops() :
+				h1.getInput().get(0).constructLops();
+			Lop right = !rightTrans ? h2.constructLops() :
+				h2.getInput().get(0).constructLops();
+			matmultCP = new MatMultCP(left, right, getDataType(), getValueType(), et, k, leftTrans, rightTrans);
+		}
 		else {
 			if( isLeftTransposeRewriteApplicable(true) ) {
 				matmultCP = constructCPLopsMMWithLeftTransposeRewrite();
diff --git a/src/main/java/org/apache/sysds/lops/MatMultCP.java b/src/main/java/org/apache/sysds/lops/MatMultCP.java
index 4f2c9cd..056a5c7 100644
--- a/src/main/java/org/apache/sysds/lops/MatMultCP.java
+++ b/src/main/java/org/apache/sysds/lops/MatMultCP.java
@@ -24,58 +24,69 @@
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
 
-public class MatMultCP extends Lop 
-{
+public class MatMultCP extends Lop {
 	private int numThreads = -1;
 	private boolean isLeftTransposed; // Used for GPU matmult operation
 	private boolean isRightTransposed;
-	
+	private boolean useTranspose;
+
 	public MatMultCP(Lop input1, Lop input2, DataType dt, ValueType vt, ExecType et) {
 		this(input1, input2, dt, vt, et, 1);
 	}
-	
+
 	public MatMultCP(Lop input1, Lop input2, DataType dt, ValueType vt, ExecType et, int k) {
 		super(Lop.Type.MatMultCP, dt, vt);
 		init(input1, input2, dt, vt, et);
 		numThreads = k;
 	}
-	
-	public MatMultCP(Lop input1, Lop input2, DataType dt, ValueType vt, ExecType et, 
-			boolean isLeftTransposed, boolean isRightTransposed) {
+
+	public MatMultCP(Lop input1, Lop input2, DataType dt, ValueType vt, ExecType et, boolean isLeftTransposed,
+		boolean isRightTransposed) {
 		super(Lop.Type.Binary, dt, vt);
 		init(input1, input2, dt, vt, et);
 		this.isLeftTransposed = isLeftTransposed;
 		this.isRightTransposed = isRightTransposed;
+		this.useTranspose = true;
 	}
-	
+
+	public MatMultCP(Lop input1, Lop input2, DataType dt, ValueType vt, ExecType et, int k, boolean isLeftTransposed,
+		boolean isRightTransposed) {
+		this(input1, input2, dt, vt, et, k);
+		this.isLeftTransposed = isLeftTransposed;
+		this.isRightTransposed = isRightTransposed;
+		this.useTranspose = true;
+	}
+
 	private void init(Lop input1, Lop input2, DataType dt, ValueType vt, ExecType et) {
 		addInput(input1);
 		addInput(input2);
 		input1.addOutput(this);
 		input2.addOutput(this);
-		lps.setProperties( inputs, et);
+		lps.setProperties(inputs, et);
 	}
 
 	@Override
 	public String toString() {
 		return " Operation: ba+*";
 	}
-	
+
 	@Override
 	public String getInstructions(String input1, String input2, String output) {
-		if( getExecType() == ExecType.CP ) {
-			return InstructionUtils.concatOperands(
-				getExecType().name(), "ba+*",
+		if(!useTranspose) {
+			return InstructionUtils.concatOperands(getExecType().name(),
+				"ba+*",
 				getInputs().get(0).prepInputOperand(input1),
 				getInputs().get(1).prepInputOperand(input2),
-				prepOutputOperand(output), String.valueOf(numThreads));
+				prepOutputOperand(output),
+				String.valueOf(numThreads));
 		}
-		else { //GPU
-			return InstructionUtils.concatOperands(
-				getExecType().name(), "ba+*",
+		else { // GPU or compressed
+			return InstructionUtils.concatOperands(getExecType().name(),
+				"ba+*",
 				getInputs().get(0).prepInputOperand(input1),
 				getInputs().get(1).prepInputOperand(input2),
-				prepOutputOperand(output), String.valueOf(numThreads),
+				prepOutputOperand(output),
+				String.valueOf(numThreads),
 				String.valueOf(isLeftTransposed),
 				String.valueOf(isRightTransposed));
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/AbstractCompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/AbstractCompressedMatrixBlock.java
index aff136a..9368a21 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/AbstractCompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/AbstractCompressedMatrixBlock.java
@@ -50,7 +50,6 @@
 import org.apache.sysds.runtime.matrix.operators.COVOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.QuaternaryOperator;
-import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysds.runtime.matrix.operators.TernaryOperator;
 import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 import org.apache.sysds.runtime.util.IndexRange;
@@ -82,6 +81,7 @@
 
 	/**
 	 * Create a potentially overlapping Compressed Matrix Block.
+	 * 
 	 * @param overLapping boolean specifying if the matrix blocks columns are overlapping.
 	 */
 	public AbstractCompressedMatrixBlock(boolean overLapping) {
@@ -176,13 +176,6 @@
 	}
 
 	@Override
-	public MatrixBlock reorgOperations(ReorgOperator op, MatrixValue ret, int startRow, int startColumn, int length) {
-		printDecompressWarning("reorgOperations");
-		MatrixBlock tmp = decompress();
-		return tmp.reorgOperations(op, ret, startRow, startColumn, length);
-	}
-
-	@Override
 	public MatrixBlock append(MatrixBlock that, MatrixBlock ret, boolean cbind) {
 		if(cbind) // use supported operation
 			return append(that, ret);
@@ -395,13 +388,6 @@
 	}
 
 	@Override
-	public MatrixBlock replaceOperations(MatrixValue result, double pattern, double replacement) {
-		printDecompressWarning("replaceOperations");
-		MatrixBlock tmp = decompress();
-		return tmp.replaceOperations(result, pattern, replacement);
-	}
-
-	@Override
 	public void ctableOperations(Operator op, double scalar, MatrixValue that, CTableMap resultMap,
 		MatrixBlock resultBlock) {
 		printDecompressWarning("ctableOperations");
@@ -507,8 +493,9 @@
 		return(mb instanceof CompressedMatrixBlock);
 	}
 
-	protected static MatrixBlock getUncompressed(MatrixValue mVal) {
-		return isCompressed((MatrixBlock) mVal) ? ((CompressedMatrixBlock) mVal).decompress(OptimizerUtils.getConstrainedNumThreads(-1)) : (MatrixBlock) mVal;
+	public static MatrixBlock getUncompressed(MatrixValue mVal) {
+		return isCompressed((MatrixBlock) mVal) ? ((CompressedMatrixBlock) mVal)
+			.decompress(OptimizerUtils.getConstrainedNumThreads(-1)) : (MatrixBlock) mVal;
 	}
 
 	protected void printDecompressWarning(String operation) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java b/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
index ba2f82d..5c0785b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
@@ -33,6 +33,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
 import org.apache.sysds.runtime.compress.utils.Bitmap;
 import org.apache.sysds.runtime.compress.utils.BitmapLossy;
@@ -55,38 +56,43 @@
 	/**
 	 * Generate uncompressed bitmaps for a set of columns in an uncompressed matrix block.
 	 * 
-	 * @param colIndices   Indexes (within the block) of the columns to extract
-	 * @param rawBlock     An uncompressed matrix block; can be dense or sparse
-	 * @param compSettings The compression settings used for the compression.
+	 * @param colIndices Indexes (within the block) of the columns to extract
+	 * @param rawBlock   An uncompressed matrix block; can be dense or sparse
+	 * @param transposed Boolean specifying if the rawblock was transposed.
 	 * @return uncompressed bitmap representation of the columns
 	 */
-	public static ABitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, CompressionSettings compSettings) {
+	public static ABitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, boolean transposed) {
 		// note: no sparse column selection reader because low potential
 		// single column selection
 		Bitmap res = null;
 		if(colIndices.length == 1) {
-			res = extractBitmap(colIndices[0], rawBlock, compSettings);
+			res = extractBitmap(colIndices[0], rawBlock, transposed);
 		}
 		// multiple column selection (general case)
 		else {
 			ReaderColumnSelection reader = null;
-			if(rawBlock.isInSparseFormat() && compSettings.transposeInput)
-				reader = new ReaderColumnSelectionSparse(rawBlock, colIndices, compSettings);
+			if(rawBlock.isInSparseFormat() && transposed)
+				reader = new ReaderColumnSelectionSparseTransposed(rawBlock, colIndices);
+			else if(rawBlock.isInSparseFormat())
+				reader = new ReaderColumnSelectionSparse(rawBlock, colIndices);
+			else if(transposed)
+				reader = new ReaderColumnSelectionDenseTransposed(rawBlock, colIndices);
 			else
-				reader = new ReaderColumnSelectionDense(rawBlock, colIndices, compSettings);
+				reader = new ReaderColumnSelectionDense(rawBlock, colIndices);
+			try {
 
-			res = extractBitmap(colIndices, reader);
+				res = extractBitmap(colIndices, reader);
+			}
+			catch(Exception e) {
+				throw new DMLRuntimeException("Failed to extract bitmap", e);
+			}
 		}
-		if(compSettings.lossy) {
-			return makeBitmapLossy(res);
-		}
-		else {
-			return res;
-		}
+		return res;
+
 	}
 
 	public static ABitmap extractBitmap(int[] colIndices, int rows, BitSet rawBlock, CompressionSettings compSettings) {
-		ReaderColumnSelection reader = new ReaderColumnSelectionBitSet(rawBlock, rows, colIndices, compSettings);
+		ReaderColumnSelection reader = new ReaderColumnSelectionBitSet(rawBlock, rows, colIndices);
 		Bitmap res = extractBitmap(colIndices, reader);
 		return res;
 	}
@@ -96,58 +102,98 @@
 	 * 
 	 * It counts the instances of zero, but skips storing the values.
 	 * 
-	 * @param colIndex     The index of the column
-	 * @param rawBlock     The Raw matrix block (that can be transposed)
-	 * @param compSettings The Compression settings used, in this instance to know if the raw block is transposed.
+	 * @param colIndex   The index of the column
+	 * @param rawBlock   The Raw matrix block (that can be transposed)
+	 * @param transposed Boolean specifying if the rawBlock is transposed or not.
 	 * @return Bitmap containing the Information of the column.
 	 */
-	private static Bitmap extractBitmap(int colIndex, MatrixBlock rawBlock, CompressionSettings compSettings) {
+	private static Bitmap extractBitmap(int colIndex, MatrixBlock rawBlock, boolean transposed) {
+		DoubleIntListHashMap hashMap = transposed ? extractHashMapTransposed(colIndex,
+			rawBlock) : extractHashMap(colIndex, rawBlock);
+		return makeBitmap(hashMap);
+	}
+
+	private static DoubleIntListHashMap extractHashMap(int colIndex, MatrixBlock rawBlock) {
 		// probe map for distinct items (for value or value groups)
 		DoubleIntListHashMap distinctVals = new DoubleIntListHashMap();
 
 		// scan rows and probe/build distinct items
-		final int m = compSettings.transposeInput ? rawBlock.getNumColumns() : rawBlock.getNumRows();
-		int numZeros = 0;
+		final int m = rawBlock.getNumRows();
 
-		if(rawBlock.isInSparseFormat() && compSettings.transposeInput) { // SPARSE and Transposed.
-			SparseBlock a = rawBlock.getSparseBlock();
-			if(a != null && !a.isEmpty(colIndex)) {
-				int apos = a.pos(colIndex);
-				int alen = a.size(colIndex);
-				numZeros = m - alen;
-				int[] aix = a.indexes(colIndex);
-				double[] avals = a.values(colIndex);
-
-				for(int j = apos; j < apos + alen; j++) {
-					IntArrayList lstPtr = distinctVals.get(avals[j]);
-					if(lstPtr == null) {
-						lstPtr = new IntArrayList();
-						distinctVals.appendValue(avals[j], lstPtr);
+		if((rawBlock.getNumRows() == 1 || rawBlock.getNumColumns() == 1) && !rawBlock.isInSparseFormat()) {
+			double[] values = rawBlock.getDenseBlockValues();
+			if(values != null)
+				for(int i = 0; i < values.length; i++) {
+					double val = values[i];
+					if(val != 0) {
+						distinctVals.appendValue(val, i);
 					}
-					lstPtr.appendValue(aix[j]);
+				}
+		}
+		else if(!rawBlock.isInSparseFormat() && rawBlock.getDenseBlock().blockSize() == 1) {
+			double[] values = rawBlock.getDenseBlockValues();
+			for(int i = 0, off = colIndex;
+				off < rawBlock.getNumRows() * rawBlock.getNumColumns();
+				i++, off += rawBlock.getNumColumns()) {
+				double val = values[off];
+				if(val != 0) {
+					distinctVals.appendValue(val, i);
 				}
 			}
 		}
 		else // GENERAL CASE
 		{
 			for(int i = 0; i < m; i++) {
-				double val = compSettings.transposeInput ? rawBlock.quickGetValue(colIndex, i) : rawBlock
-					.quickGetValue(i, colIndex);
+				double val = rawBlock.quickGetValue(i, colIndex);
 				if(val != 0) {
-					IntArrayList lstPtr = distinctVals.get(val);
-					if(lstPtr == null) {
-						lstPtr = new IntArrayList();
-						distinctVals.appendValue(val, lstPtr);
-					}
-					lstPtr.appendValue(i);
-				}
-				else {
-					numZeros++;
+					distinctVals.appendValue(val, i);
 				}
 			}
 		}
+		return distinctVals;
+	}
 
-		return makeBitmap(distinctVals, numZeros);
+	private static DoubleIntListHashMap extractHashMapTransposed(int colIndex, MatrixBlock rawBlock) {
+		// probe map for distinct items (for value or value groups)
+		DoubleIntListHashMap distinctVals = new DoubleIntListHashMap();
+
+		// scan rows and probe/build distinct items
+		final int m = rawBlock.getNumColumns();
+
+		if(rawBlock.isInSparseFormat()) { // SPARSE and Transposed.
+			SparseBlock a = rawBlock.getSparseBlock();
+			if(a != null && !a.isEmpty(colIndex)) {
+				int apos = a.pos(colIndex);
+				int alen = a.size(colIndex);
+				int[] aix = a.indexes(colIndex);
+				double[] avals = a.values(colIndex);
+
+				for(int j = apos; j < apos + alen; j++) {
+					distinctVals.appendValue(avals[j], aix[j]);
+				}
+			}
+		}
+		else if((rawBlock.getNumRows() == 1 || rawBlock.getNumColumns() == 1) && !rawBlock.isInSparseFormat()) {
+			double[] values = rawBlock.getDenseBlockValues();
+			if(values != null) {
+				for(int i = 0; i < values.length; i++) {
+					double val = values[i];
+					if(val != 0) {
+						distinctVals.appendValue(val, i);
+					}
+				}
+			}
+		}
+		else // GENERAL CASE
+		{
+			for(int i = 0; i < m; i++) {
+				double val = rawBlock.quickGetValue(colIndex, i);
+				if(val != 0) {
+					distinctVals.appendValue(val, i);
+				}
+			}
+		}
+		return distinctVals;
 	}
 
 	/**
@@ -187,38 +233,38 @@
 				lstPtr.appendValue(rowReader.getCurrentRowIndex());
 			}
 		}
-		return makeBitmap(distinctVals, colIndices.length, zero);
+		return makeBitmap(distinctVals, zero, colIndices.length);
 	}
 
 	/**
 	 * Make the multi column Bitmap.
 	 * 
-	 * @param distinctVals The distinct values fround in the columns selected.
-	 * @param numColumns   Number of columns
+	 * @param distinctVals The distinct values found in the columns selected.
 	 * @param numZeros     Number of zero rows. aka rows only containing zero values.
+	 * @param numCols      Number of columns
 	 * @return The Bitmap.
 	 */
-	private static Bitmap makeBitmap(DblArrayIntListHashMap distinctVals, int numColumns, int numZeros) {
+	private static Bitmap makeBitmap(DblArrayIntListHashMap distinctVals, int numZeros, int numCols) {
 		// added for one pass bitmap construction
 		// Convert inputs to arrays
-		int numVals = distinctVals.size();
-		int numCols = numColumns;
-		double[] values = new double[numVals * numCols];
-		IntArrayList[] offsetsLists = new IntArrayList[numVals];
-		int bitmapIx = 0;
-		for(DArrayIListEntry val : distinctVals.extractValues()) {
-			System.arraycopy(val.key.getData(), 0, values, bitmapIx * numCols, numCols);
-			offsetsLists[bitmapIx++] = val.value;
+		ArrayList<DArrayIListEntry> mapEntries = distinctVals.extractValues();
+		if(!mapEntries.isEmpty()) {
+
+			int numVals = distinctVals.size();
+			double[] values = new double[numVals * numCols];
+			IntArrayList[] offsetsLists = new IntArrayList[numVals];
+			int bitmapIx = 0;
+			for(DArrayIListEntry val : mapEntries) {
+				System.arraycopy(val.key.getData(), 0, values, bitmapIx * numCols, numCols);
+				offsetsLists[bitmapIx++] = val.value;
+			}
+
+			return new Bitmap(numCols, offsetsLists, numZeros, values);
+		}
+		else {
+			return new Bitmap(numCols, null, numZeros, null);
 		}
 
-		// HACK; we make sure that the first sparse unsafe operation assume
-		// that we have entries with zero values. This makes the first sparse
-		// unsafe operation slightly slower, if the input compressed matrix is
-		// fully dense, aka containing no zero values.
-		// This is required for multi-column colGroups.
-		numZeros = (numColumns > 1) ? numZeros + 1 : numZeros;
-
-		return new Bitmap(numCols, offsetsLists, numZeros, values);
 	}
 
 	/**
@@ -228,7 +274,7 @@
 	 * @param numZeros     Number of zero values in the matrix
 	 * @return The single column Bitmap.
 	 */
-	private static Bitmap makeBitmap(DoubleIntListHashMap distinctVals, int numZeros) {
+	private static Bitmap makeBitmap(DoubleIntListHashMap distinctVals) {
 		// added for one pass bitmap construction
 		// Convert inputs to arrays
 		int numVals = distinctVals.size();
@@ -240,7 +286,7 @@
 			offsetsLists[bitmapIx++] = val.value;
 		}
 
-		return new Bitmap(1, offsetsLists, numZeros, values);
+		return new Bitmap(1, offsetsLists, 1, values);
 	}
 
 	/**
@@ -249,7 +295,7 @@
 	 * @param ubm The Uncompressed version of the bitmap.
 	 * @return A bitmap.
 	 */
-	private static ABitmap makeBitmapLossy(Bitmap ubm) {
+	public static ABitmap makeBitmapLossy(Bitmap ubm) {
 		final double[] fp = ubm.getValues();
 		if(fp.length == 0) {
 			return ubm;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 986a77d..eb4d820 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -39,8 +39,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.lops.MMTSJ.MMTSJType;
 import org.apache.sysds.lops.MapMultChain.ChainType;
+import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.colgroup.ColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
@@ -62,8 +65,8 @@
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.functionobjects.Builtin;
-import org.apache.sysds.runtime.functionobjects.Divide;
 import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysds.runtime.functionobjects.Divide;
 import org.apache.sysds.runtime.functionobjects.Equals;
 import org.apache.sysds.runtime.functionobjects.GreaterThan;
 import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
@@ -78,6 +81,7 @@
 import org.apache.sysds.runtime.functionobjects.NotEquals;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.functionobjects.PlusMultiply;
+import org.apache.sysds.runtime.functionobjects.SwapIndex;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.LibMatrixBincell;
 import org.apache.sysds.runtime.matrix.data.LibMatrixBincell.BinaryAccessType;
@@ -87,6 +91,7 @@
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.utils.DMLCompressionStatistics;
@@ -166,21 +171,24 @@
 		Timing time = new Timing(true);
 
 		// preallocation sparse rows to avoid repeated reallocations
-		MatrixBlock ret = (nonZeros == -1) ? new MatrixBlock(rlen, clen, false, -1)
-			.allocateBlock() : new MatrixBlock(rlen, clen, sparse, nonZeros).allocateBlock();
-		if(ret.isInSparseFormat()) {
-			int[] rnnz = new int[rlen];
-			for(ColGroup grp : _colGroups)
-				grp.countNonZerosPerRow(rnnz, 0, rlen);
-			ret.allocateSparseRowsBlock();
-			SparseBlock rows = ret.getSparseBlock();
-			for(int i = 0; i < rlen; i++)
-				rows.allocate(i, rnnz[i]);
-		}
+		MatrixBlock ret = new MatrixBlock(rlen, clen, false, -1);
+		ret.allocateDenseBlock();
+		// (nonZeros == -1) ?
+		// .allocateBlock() : new MatrixBlock(rlen, clen, sparse, nonZeros).allocateBlock();
+
+		// if(ret.isInSparseFormat()) {
+		// int[] rnnz = new int[rlen];
+		// // for(ColGroup grp : _colGroups)
+		// // grp.countNonZerosPerRow(rnnz, 0, rlen);
+		// ret.allocateSparseRowsBlock();
+		// SparseBlock rows = ret.getSparseBlock();
+		// for(int i = 0; i < rlen; i++)
+		// rows.allocate(i, rnnz[i]);
+		// }
 
 		// core decompression (append if sparse)
 		for(ColGroup grp : _colGroups)
-			grp.decompressToBlock(ret, 0, rlen);
+			grp.decompressToBlockSafe(ret, 0, rlen, 0, grp.getValues(), false);
 
 		// post-processing (for append in decompress)
 		if(ret.getNonZeros() == -1 || nonZeros == -1) {
@@ -217,6 +225,7 @@
 			.allocateBlock() : new MatrixBlock(rlen, clen, sparse, nonZeros).allocateBlock();
 		// multi-threaded decompression
 		nonZeros = 0;
+		boolean overlapping = isOverlapping();
 		try {
 			ExecutorService pool = CommonThreadPool.get(k);
 			int rlen = getNumRows();
@@ -225,7 +234,8 @@
 			blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
 			ArrayList<DecompressTask> tasks = new ArrayList<>();
 			for(int i = 0; i < k & i * blklen < getNumRows(); i++)
-				tasks.add(new DecompressTask(_colGroups, ret, i * blklen, Math.min((i + 1) * blklen, rlen)));
+				tasks.add(
+					new DecompressTask(_colGroups, ret, i * blklen, Math.min((i + 1) * blklen, rlen), overlapping));
 			List<Future<Long>> rtasks = pool.invokeAll(tasks);
 			pool.shutdown();
 			for(Future<Long> rt : rtasks)
@@ -237,8 +247,12 @@
 			ex.printStackTrace();
 			return decompress();
 		}
-
-		ret.setNonZeros(nonZeros);
+		if(overlapping) {
+			ret.recomputeNonZeros();
+		}
+		else {
+			ret.setNonZeros(nonZeros);
+		}
 
 		if(DMLScript.STATISTICS || LOG.isDebugEnabled()) {
 			double t = time.stop();
@@ -366,9 +380,18 @@
 
 	public int[] countNonZerosPerRow(int rl, int ru) {
 		int[] rnnz = new int[ru - rl];
-		for(ColGroup grp : _colGroups)
-			grp.countNonZerosPerRow(rnnz, rl, ru);
-		return rnnz;
+		if(!isOverlapping()) {
+
+			for(ColGroup grp : _colGroups)
+				grp.countNonZerosPerRow(rnnz, rl, ru);
+			return rnnz;
+		}
+		else {
+			LOG.warn(
+				"Not good to calculate number of non zeros in segment when overlapping compressed returning as if fully dense");
+			Arrays.fill(rnnz, getNumColumns());
+			return rnnz;
+		}
 	}
 
 	@Override
@@ -383,8 +406,8 @@
 			return ret;
 		}
 
-		if(isOverlapping() && (!(sop.fn instanceof Multiply || sop.fn instanceof Divide 
-			|| sop.fn instanceof Plus || sop.fn instanceof Minus))) {
+		if(isOverlapping() && (!(sop.fn instanceof Multiply || sop.fn instanceof Divide || sop.fn instanceof Plus ||
+			sop.fn instanceof Minus))) {
 			LOG.warn("scalar overlapping not supported for op: " + sop.fn);
 			MatrixBlock m1d = decompress(sop.getNumThreads());
 			return m1d.scalarOperations(sop, result);
@@ -407,13 +430,13 @@
 
 		BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(this, that);
 
-		if(atype == BinaryAccessType.MATRIX_COL_VECTOR || atype == BinaryAccessType.MATRIX_MATRIX ) {
+		if(atype == BinaryAccessType.MATRIX_COL_VECTOR || atype == BinaryAccessType.MATRIX_MATRIX) {
 			MatrixBlock ret = LibBinaryCellOp.binaryMVPlusCol(this, that, op);
 			result = ret;
 			return ret;
 		}
-		else if(!(op.fn instanceof Multiply || op.fn instanceof Divide || op.fn instanceof Plus || op.fn instanceof Minus ||
-			op.fn instanceof MinusMultiply || op.fn instanceof PlusMultiply)) {
+		else if(!(op.fn instanceof Multiply || op.fn instanceof Divide || op.fn instanceof Plus ||
+			op.fn instanceof Minus || op.fn instanceof MinusMultiply || op.fn instanceof PlusMultiply)) {
 			LOG.warn("Decompressing since Binary Ops" + op.fn + " is not supported compressed");
 			MatrixBlock m2 = getUncompressed(this);
 			MatrixBlock ret = m2.binaryOperations(op, thatValue, result);
@@ -524,22 +547,104 @@
 	@Override
 	public MatrixBlock aggregateBinaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
 		AggregateBinaryOperator op) {
+		return aggregateBinaryOperations(m1, m2, ret, op, false, false);
+	}
+
+	public MatrixBlock aggregateBinaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
+		AggregateBinaryOperator op, boolean transposeLeft, boolean transposeRight) {
+
+		MatrixBlock that;
+		// Handle if the matrix block inputs are transposed, but not compressed
+		// in general this is safe to do, since the decompression would cost more than the transpose.
+		if(!(m1 instanceof CompressedMatrixBlock) && transposeLeft) {
+			ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), op.getNumThreads());
+			m1 = m1.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+		}
+		else if(!(m2 instanceof CompressedMatrixBlock) && transposeRight) {
+			ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), op.getNumThreads());
+			m2 = m2.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+		}
+		// Handle the case of both sides being compressed.
+		else if(m1 instanceof CompressedMatrixBlock && m2 instanceof CompressedMatrixBlock) {
+			// Both sides are compressed but none of them are transposed.
+			if(!transposeLeft && !transposeRight) {
+				// If both are not transposed, decompress the right hand side. to enable compressed overlapping output.
+				LOG.warn("Matrix decompression from multiplying two compressed matrices.");
+				m2 = getUncompressed(m2);
+			}
+			else if(transposeLeft && !transposeRight) {
+				// ideal situation
+				// if(m1.getNumColumns() * ((CompressedMatrixBlock) m1).getColGroups().size() < m2.getNumColumns() *
+				// ((CompressedMatrixBlock) m2).getColGroups().size()) {
+				if(m1.getNumColumns() > m2.getNumColumns()) {
+					LOG.error("case 1");
+					ret = LibLeftMultBy.leftMultByMatrix(((CompressedMatrixBlock) m1).getColGroups(),
+						m2,
+						ret,
+						true,
+						true,
+						m1.getNumColumns(),
+						((CompressedMatrixBlock) m1).isOverlapping(),
+						op.getNumThreads(),
+						((CompressedMatrixBlock) m1).getMaxNumValues());
+					ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), op.getNumThreads());
+					ret = ret.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+					return ret;
+				}
+				else {
+					LOG.error("case 2");
+					return LibLeftMultBy.leftMultByMatrix(((CompressedMatrixBlock) m2).getColGroups(),
+						m1,
+						ret,
+						true,
+						true,
+						m2.getNumColumns(),
+						((CompressedMatrixBlock) m2).isOverlapping(),
+						op.getNumThreads(),
+						((CompressedMatrixBlock) m2).getMaxNumValues());
+
+				}
+			}
+			else if(!transposeLeft && transposeRight) {
+				throw new DMLCompressionException("Not Implemented compressed Matrix Mult, to produce larger matrix");
+				// worst situation since it blows up the result matrix in number of rows in either compressed matrix.
+			}
+			else {
+				ret = aggregateBinaryOperations(m2, m1, ret, op);
+				ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), op.getNumThreads());
+				return ret.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+			}
+		}
+		// Handle if the transpose is on the compressed matrix!
+		// to implement this we need to store a boolean specifying that the compressed matrix is transposed, and then
+		// in all operations check this boolean for if the matrix is in a transposed setting.
+		// The benefit of this is that it would allow us to use our right matrix multiplication and continue with
+		// overlapping intermediates.
+		else if(m1 instanceof CompressedMatrixBlock && transposeLeft) {
+			LOG.warn("transposing inverse to avoid decompress because left hand side is compressed");
+			// change operation from t(m1) %*% m2 -> t( t(m2) %*% m1 )
+			ret = ((CompressedMatrixBlock) m1).aggregateBinaryOperations(m2, m1, ret, op, true, false);
+			ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), op.getNumThreads());
+			return ret.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+		}
+		else if(m2 instanceof CompressedMatrixBlock && transposeRight) {
+			throw new DMLCompressionException("Not Implemented compressed right transpose matrix multiplication");
+		}
 
 		// setup meta data (dimensions, sparsity)
-
 		boolean right = (m1 == this);
-		MatrixBlock that = right ? m2 : m1;
+		that = right ? m2 : m1;
 		if(!right && m2 != this) {
 			throw new DMLRuntimeException(
 				"Invalid inputs for aggregate Binary Operation which expect either m1 or m2 to be equal to the object calling");
 		}
 
-		int rl = m1.getNumRows();
-		int cl = m2.getNumColumns();
-
 		// create output matrix block
 		if(right) {
-			return LibRightMultBy.rightMultByMatrix(_colGroups, that, ret, op.getNumThreads(), getMaxNumValues(), true);
+			boolean allowOverlap = ConfigurationManager.getDMLConfig()
+				.getBooleanValue(DMLConfig.COMPRESSED_OVERLAPPING);
+			return LibRightMultBy
+				.rightMultByMatrix(_colGroups, that, ret, op.getNumThreads(), getMaxNumValues(), allowOverlap);
 		}
 		else {
 			return LibLeftMultBy.leftMultByMatrix(_colGroups,
@@ -547,8 +652,7 @@
 				ret,
 				false,
 				true,
-				rl,
-				cl,
+				m2.getNumColumns(),
 				isOverlapping(),
 				op.getNumThreads(),
 				getMaxNumValues());
@@ -636,20 +740,38 @@
 
 		if(!isEmptyBlock(false)) {
 			// compute matrix mult
-			LibLeftMultBy.leftMultByTransposeSelf(_colGroups,
-				out,
-				0,
-				_colGroups.size(),
-				k,
-				getNumColumns(),
-				getMaxNumValues(),
-				isOverlapping());
+			LibLeftMultBy
+				.leftMultByTransposeSelf(_colGroups, out, k, getNumColumns(), getMaxNumValues(), isOverlapping());
 			// post-processing
 			out.setNonZeros(LinearAlgebraUtils.copyUpperToLowerTriangle(out));
 		}
 		return out;
 	}
 
+	@Override
+	public MatrixBlock replaceOperations(MatrixValue result, double pattern, double replacement) {
+		// if(Double.isNaN(pattern)) {
+		// LOG.debug("Skipping replace op because nan is not posible for compressed matrices");
+		// result = this;
+		// return this;
+		// }
+		// else {
+
+		printDecompressWarning("replaceOperations " + pattern + "  -> " + replacement);
+		LOG.error("Overlapping? : " + isOverlapping());
+		MatrixBlock tmp = getUncompressed(this);
+		return tmp.replaceOperations(result, pattern, replacement);
+		// }
+	}
+
+	@Override
+	public MatrixBlock reorgOperations(ReorgOperator op, MatrixValue ret, int startRow, int startColumn, int length) {
+		printDecompressWarning(op.getClass().getSimpleName() + " -- " + op.fn.getClass().getSimpleName());
+		LOG.error("transposeSize:" + this.getNumRows() + "  " + this.getNumColumns());
+		MatrixBlock tmp = decompress(op.getNumThreads());
+		return tmp.reorgOperations(op, ret, startRow, startColumn, length);
+	}
+
 	public boolean hasUncompressedColGroup() {
 		return getUncompressedColGroup() != null;
 	}
@@ -690,19 +812,21 @@
 		private final MatrixBlock _ret;
 		private final int _rl;
 		private final int _ru;
+		private final boolean _overlapping;
 
-		protected DecompressTask(List<ColGroup> colGroups, MatrixBlock ret, int rl, int ru) {
+		protected DecompressTask(List<ColGroup> colGroups, MatrixBlock ret, int rl, int ru, boolean overlapping) {
 			_colGroups = colGroups;
 			_ret = ret;
 			_rl = rl;
 			_ru = ru;
+			_overlapping = overlapping;
 		}
 
 		@Override
 		public Long call() {
 
 			// preallocate sparse rows to avoid repeated alloc
-			if(_ret.isInSparseFormat()) {
+			if(!_overlapping && _ret.isInSparseFormat()) {
 				int[] rnnz = new int[_ru - _rl];
 				for(ColGroup grp : _colGroups)
 					grp.countNonZerosPerRow(rnnz, _rl, _ru);
@@ -713,13 +837,13 @@
 
 			// decompress row partition
 			for(ColGroup grp : _colGroups)
-				grp.decompressToBlock(_ret, _rl, _ru);
+				grp.decompressToBlockSafe(_ret, _rl, _ru, _rl, grp.getValues(), false);
 
 			// post processing (sort due to append)
 			if(_ret.isInSparseFormat())
 				_ret.sortSparseRows(_rl, _ru);
 
-			return _ret.recomputeNonZeros(_rl, _ru - 1);
+			return _overlapping ? 0 : _ret.recomputeNonZeros(_rl, _ru - 1);
 		}
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index 32ae51f..0bc5aa8 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -27,6 +27,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder;
 import org.apache.sysds.runtime.compress.colgroup.ColGroup;
@@ -46,11 +47,10 @@
 public class CompressedMatrixBlockFactory {
 
 	private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());
-	private static final CompressionSettings defaultCompressionSettings = new CompressionSettingsBuilder().create();
 
 	public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb) {
 		// Default sequential execution of compression
-		return compress(mb, 1, defaultCompressionSettings);
+		return compress(mb, 1, new CompressionSettingsBuilder().create());
 	}
 
 	public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
@@ -59,9 +59,10 @@
 	}
 
 	public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k) {
-		return compress(mb, k, defaultCompressionSettings);
+		return compress(mb, k, new CompressionSettingsBuilder().create());
 	}
 
+
 	/**
 	 * The main method for compressing the input matrix.
 	 * 
@@ -85,40 +86,36 @@
 		}
 
 		Timing time = new Timing(true);
+
 		CompressionStatistics _stats = new CompressionStatistics();
+		CompressedMatrixBlock res = null;
 
 		// Prepare basic meta data and deep copy / transpose input
 		int numRows = mb.getNumRows();
 		int numCols = mb.getNumColumns();
-		boolean sparse = mb.isInSparseFormat();
-
-		// Transpose the MatrixBlock if the TransposeInput flag is set.
-		// This gives better cache consciousness, at a small upfront cost.
-		MatrixBlock rawBlock = !compSettings.transposeInput ? new MatrixBlock(mb) : LibMatrixReorg
-			.transpose(mb, new MatrixBlock(numCols, numRows, sparse), k);
-
-		// Construct sample-based size estimator
-		CompressedSizeEstimator sizeEstimator = CompressedSizeEstimatorFactory.getSizeEstimator(rawBlock, compSettings);
+		int phase = 0;
 
 		// --------------------------------------------------
-		// PHASE 1: Classify columns by compression type
+		// PHASE : Classify columns by compression type
 		// Start by determining which columns are amenable to compression
 
 		// Classify columns according to ratio (size uncompressed / size compressed),
 		// where a column is compressible if ratio > 1.
-
+		MatrixBlock shallowCopy = new MatrixBlock().copyShallow(mb);
+		// Construct sample-based size estimator
+		CompressedSizeEstimator sizeEstimator = CompressedSizeEstimatorFactory
+			.getSizeEstimator(shallowCopy, compSettings, false);
 		CompressedSizeInfo sizeInfos = sizeEstimator.computeCompressedSizeInfos(k);
 
 		if(compSettings.investigateEstimate)
 			_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
 
 		_stats.setNextTimePhase(time.stop());
-		if (DMLScript.STATISTICS ){
+		if(DMLScript.STATISTICS) {
 			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 1);
 		}
-		if(LOG.isDebugEnabled()){
-			LOG.debug("Compression statistics:");
-			LOG.debug("--compression phase 1: " + _stats.getLastTimePhase());
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("--compression phase " + phase++ + " Classify  : " + _stats.getLastTimePhase());
 		}
 
 		if(sizeInfos.colsC.isEmpty()) {
@@ -126,49 +123,77 @@
 			return new ImmutablePair<>(new MatrixBlock().copyShallow(mb), _stats);
 		}
 		// --------------------------------------------------
-
+		if(sizeInfos.colsC.size() != mb.getNumColumns()){
+			throw new DMLCompressionException("Invalid number of columns is:" +  sizeInfos.colsC.size() + " and should be: " + mb.getNumColumns());
+		}
 		// --------------------------------------------------
-		// PHASE 2: Grouping columns
+		// PHASE : Grouping columns
 		// Divide the columns into column groups.
 		List<int[]> coCodeColGroups = PlanningCoCoder
 			.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
 		_stats.setNextTimePhase(time.stop());
-		if (DMLScript.STATISTICS ){
+		if(DMLScript.STATISTICS) {
 			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 2);
 		}
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("--compression phase 2: " + _stats.getLastTimePhase());
+			LOG.debug("--compression phase " + phase++ + " Grouping  : " + _stats.getLastTimePhase());
 			StringBuilder sb = new StringBuilder();
 			for(int[] group : coCodeColGroups)
 				sb.append(Arrays.toString(group));
 			LOG.debug(sb.toString());
 		}
-
-		// TODO: Make second estimate of memory usage if the ColGroups are as above?
-		// This should already be done inside the PlanningCoCoder, and therefore this information
-		// should be returned there, and not estimated twice.
-		// if(INVESTIGATE_ESTIMATES) {
-		// _stats.estimatedSizeColGroups = memoryEstimateIfColsAre(coCodeColGroups);
-		// }
 		// --------------------------------------------------
 
+		// Heuristic to decide if we should transpose the entire matrix input.
+		switch(compSettings.transposeInput) {
+			case "true":
+				compSettings.transposed = true;
+				break;
+			case "false":
+				compSettings.transposed = false;
+				break;
+			default:
+				compSettings.transposed = numRows > 1000000 || coCodeColGroups.size() > numCols / 2;
+		}
+
+		// -------------------------------------------------
+		// PHASE : transpose input matrix
+		// Transpose the matrix, to give more cache friendly access to reading row by row values.
+
+		// Transpose the MatrixBlock if the TransposeInput flag is set.
+		// This gives better cache consciousness, at a small upfront cost.
+
+		boolean sparse = mb.isInSparseFormat();
+		MatrixBlock rawBlock = compSettings.transposed ? LibMatrixReorg.transpose(mb,
+			new MatrixBlock(numCols, numRows, sparse),
+			k) : new MatrixBlock(numRows, numCols, sparse).copyShallow(mb);
+
+		_stats.setNextTimePhase(time.stop());
+		if(DMLScript.STATISTICS) {
+			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 0);
+		}
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("Compression statistics:");
+			LOG.debug("--compression phase " + phase++ + " Transpose : " + _stats.getLastTimePhase());
+		}
+
 		// --------------------------------------------------
 		// PHASE 3: Compress and correct sample-based decisions
 		ColGroup[] colGroups = ColGroupFactory
 			.compressColGroups(rawBlock, sizeInfos.compRatios, coCodeColGroups, compSettings, k);
 
 		// Make Compression happen!
-		CompressedMatrixBlock res = new CompressedMatrixBlock(mb);
+		res = new CompressedMatrixBlock(mb);
 		List<ColGroup> colGroupList = ColGroupFactory.assignColumns(numCols, colGroups, rawBlock, compSettings);
 		res.allocateColGroupList(colGroupList);
 		_stats.setNextTimePhase(time.stop());
-		if (DMLScript.STATISTICS ){
+		if(DMLScript.STATISTICS) {
 			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 3);
 		}
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("Hash overlap count:" + DblArrayIntListHashMap.hashMissCount);
+			LOG.debug("--compression phase " + phase++ + " Compress  : " + _stats.getLastTimePhase());
+			LOG.debug("--compression Hash collisions:" + DblArrayIntListHashMap.hashMissCount);
 			DblArrayIntListHashMap.hashMissCount = 0;
-			LOG.debug("--compression phase 3: " + _stats.getLastTimePhase());
 		}
 		// --------------------------------------------------
 
@@ -181,11 +206,11 @@
 		// res._sharedDDC1Dict = true;
 		// }
 		_stats.setNextTimePhase(time.stop());
-		if (DMLScript.STATISTICS ){
+		if(DMLScript.STATISTICS) {
 			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 4);
 		}
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("--compression phase 4: " + _stats.getLastTimePhase());
+			LOG.debug("--compression phase " + phase++ + " Share     : " + _stats.getLastTimePhase());
 		}
 		// --------------------------------------------------
 
@@ -197,6 +222,8 @@
 		_stats.ratio = _stats.originalSize / (double) _stats.size;
 
 		if(_stats.ratio < 1) {
+			LOG.info("--compressed size: " + _stats.size);
+			LOG.info("--compression ratio: " + _stats.ratio);
 			LOG.info("Abort block compression because compression ratio is less than 1.");
 			return new ImmutablePair<>(new MatrixBlock().copyShallow(mb), _stats);
 		}
@@ -208,27 +235,33 @@
 		_stats.setNextTimePhase(time.stop());
 		_stats.setColGroupsCounts(colGroupList);
 
-		if (DMLScript.STATISTICS ){
+		if(DMLScript.STATISTICS) {
 			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 5);
 		}
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols);
-			LOG.debug("--compression phase 5: " + _stats.getLastTimePhase());
+			LOG.debug("--compression phase " + phase++ + " Cleanup   : " + _stats.getLastTimePhase());
 			LOG.debug("--col groups types " + _stats.getGroupsTypesString());
 			LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
 			LOG.debug("--compressed size: " + _stats.size);
 			LOG.debug("--compression ratio: " + _stats.ratio);
+			int[] lengths = new int[colGroupList.size()];
+			int i = 0;
+			for(ColGroup colGroup : colGroupList) {
+				if(colGroup.getValues() != null)
+					lengths[i++] = colGroup.getValues().length / colGroup.getColIndices().length;
+			}
+			LOG.debug("--compressed colGroup dictionary sizes: " + Arrays.toString(lengths));
 
 			if(LOG.isTraceEnabled()) {
 				for(ColGroup colGroup : colGroupList) {
 					LOG.trace("--colGroups colIndexes : " + Arrays.toString(colGroup.getColIndices()));
 					LOG.trace("--colGroups type       : " + colGroup.getClass().getSimpleName());
-					LOG.trace("--colGroups Values     : " + Arrays.toString(colGroup.getValues()));
+					// LOG.trace("--colGroups Values : " + Arrays.toString(colGroup.getValues()));
 				}
 			}
 		}
 
-		
 		return new ImmutablePair<>(res, _stats);
 		// --------------------------------------------------
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index 901e883..0a654ed 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -49,11 +49,14 @@
 	/** Share DDC Dictionaries between ColGroups. */
 	public final boolean allowSharedDictionary;
 
+	/** Boolean specifying which transpose setting is used, can be auto, true or false */
+	public final String transposeInput;
+
 	/**
-	 * Transpose input matrix, to optimize performance, this reallocate the matrix to a more cache conscious allocation
-	 * for iteration in columns.
+	 * Transpose input matrix, to optimize access when extracting bitmaps.
+	 * This setting is changed inside the script based on the transposeInput setting.
 	 */
-	public final boolean transposeInput;
+	public boolean transposed = false;
 
 	/**
 	 * Boolean specifying if the OLE and RLE should construct skip to enable skipping large amounts of rows.
@@ -82,7 +85,7 @@
 	 */
 	public final EnumSet<CompressionType> validCompressions;
 
-	protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, boolean transposeInput,
+	protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput,
 		boolean skipList, int seed, boolean investigateEstimate, boolean lossy,
 		EnumSet<CompressionType> validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner,
 		int maxStaticColGroupCoCode) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index 43f45d1..92697de 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -23,6 +23,7 @@
 
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
 import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
 
@@ -30,18 +31,17 @@
  * Builder pattern for Compression Settings. See CompressionSettings for details on values.
  */
 public class CompressionSettingsBuilder {
-	private double samplingRatio = 1.0;
+	private double samplingRatio;
 	private boolean allowSharedDictionary = false;
-	private boolean transposeInput = true;
+	private String transposeInput;
 	private boolean skipList = true;
 	private int seed = -1;
 	private boolean investigateEstimate = false;
 	private boolean lossy = false;
 	private EnumSet<CompressionType> validCompressions;
 	private boolean sortValuesByLength = false;
-	private PartitionerType columnPartitioner = PartitionerType.COST;
-	// private PartitionerType columnPartitioner = PartitionerType.STATIC;
-	private int maxStaticColGroupCoCode = 1;
+	private PartitionerType columnPartitioner;
+	private int maxStaticColGroupCoCode = 10;
 
 	public CompressionSettingsBuilder() {
 
@@ -49,10 +49,13 @@
 		this.lossy = conf.getBooleanValue(DMLConfig.COMPRESSED_LOSSY);
 		this.validCompressions = EnumSet.of(CompressionType.UNCOMPRESSED);
 		String[] validCompressionsString = conf.getTextValue(DMLConfig.COMPRESSED_VALID_COMPRESSIONS).split(",");
-		;
 		for(String comp : validCompressionsString) {
 			validCompressions.add(CompressionType.valueOf(comp));
 		}
+		samplingRatio = conf.getDoubleValue(DMLConfig.COMPRESSED_SAMPLING_RATIO);
+		columnPartitioner = PartitionerType.valueOf(conf.getTextValue(DMLConfig.COMPRESSED_COCODE));
+
+		transposeInput = conf.getTextValue(DMLConfig.COMPRESSED_TRANSPOSE);
 	}
 
 	/**
@@ -120,11 +123,19 @@
 	 * Specify if the input matrix should be transposed before compression. This improves cache efficiency while
 	 * compression the input matrix
 	 * 
-	 * @param transposeInput boolean specifying if the input should be transposed before compression
+	 * @param transposeInput string specifying if the input should be transposed before compression, should be one of "auto", "true" or "false"
 	 * @return The CompressionSettingsBuilder
 	 */
-	public CompressionSettingsBuilder setTransposeInput(boolean transposeInput) {
-		this.transposeInput = transposeInput;
+	public CompressionSettingsBuilder setTransposeInput(String transposeInput) {
+		switch(transposeInput){
+			case "auto":
+			case "true":
+			case "false":
+				this.transposeInput = transposeInput;
+				break;
+			default:
+				throw new DMLCompressionException("Invalid transpose technique");
+		}
 		return this;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
index 48093eb..36fbda1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
@@ -49,7 +49,7 @@
 	}
 
 	/**
-	 * Set array of counts regarding col group types. 
+	 * Set array of counts regarding col group types.
 	 * 
 	 * The position corresponds with the enum ordinal.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelection.java b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelection.java
index b8cdd03..724d118 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelection.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelection.java
@@ -33,13 +33,10 @@
 
 	private DblArray nonZeroReturn;
 
-	protected CompressionSettings _compSettings;
-
-	protected ReaderColumnSelection(int[] colIndexes, int numRows, CompressionSettings compSettings) {
+	protected ReaderColumnSelection(int[] colIndexes, int numRows) {
 		_colIndexes = colIndexes;
 		_numRows = numRows;
 		_lastRow = -1;
-		_compSettings = compSettings;
 	}
 
 	/**
diff --git a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionBitSet.java b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionBitSet.java
index 8f4aeff..2fb380f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionBitSet.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionBitSet.java
@@ -29,8 +29,8 @@
 	private DblArray reusableReturn;
 	private double[] reusableArr;
 
-	public ReaderColumnSelectionBitSet(BitSet data, int rows, int[] colIndices, CompressionSettings compSettings) {
-		super(colIndices, rows, compSettings);
+	public ReaderColumnSelectionBitSet(BitSet data, int rows, int[] colIndices) {
+		super(colIndices, rows);
 		_data = data;
 		reusableArr = new double[colIndices.length];
 		reusableReturn = new DblArray(reusableArr);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDense.java b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDense.java
index d07b863..5ac93df 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDense.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDense.java
@@ -28,21 +28,19 @@
 	private DblArray reusableReturn;
 	private double[] reusableArr;
 
-	public ReaderColumnSelectionDense(MatrixBlock data, int[] colIndices, CompressionSettings compSettings) {
-		super(colIndices, compSettings.transposeInput ? data.getNumColumns() : data.getNumRows(), compSettings);
+	public ReaderColumnSelectionDense(MatrixBlock data, int[] colIndices) {
+		super(colIndices, data.getNumRows());
 		_data = data;
 		reusableArr = new double[colIndices.length];
 		reusableReturn = new DblArray(reusableArr);
 	}
 
-
 	protected DblArray getNextRow() {
 		if(_lastRow == _numRows - 1)
 			return null;
 		_lastRow++;
 		for(int i = 0; i < _colIndexes.length; i++) {
-			reusableArr[i] = _compSettings.transposeInput ? _data.quickGetValue(_colIndexes[i], _lastRow) : _data
-				.quickGetValue(_lastRow, _colIndexes[i]);
+			reusableArr[i] = _data.quickGetValue(_lastRow, _colIndexes[i]);
 		}
 		return reusableReturn;
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDenseSample.java b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDenseTransposed.java
similarity index 61%
rename from src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDenseSample.java
rename to src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDenseTransposed.java
index bb314f2..b815bc5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDenseSample.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionDenseTransposed.java
@@ -22,39 +22,26 @@
 import org.apache.sysds.runtime.compress.utils.DblArray;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-/** considers only a subset of row indexes */
-public class ReaderColumnSelectionDenseSample extends ReaderColumnSelection {
+public class ReaderColumnSelectionDenseTransposed extends ReaderColumnSelection {
 	protected MatrixBlock _data;
 
-	private int[] _sampleIndexes;
-	private int lastIndex = -1;
-
-	// reusable return
 	private DblArray reusableReturn;
 	private double[] reusableArr;
 
-	public ReaderColumnSelectionDenseSample(MatrixBlock data, int[] colIndexes, int[] sampleIndexes,
-		CompressionSettings compSettings) {
-		super(colIndexes, -1, compSettings);
+	public ReaderColumnSelectionDenseTransposed(MatrixBlock data, int[] colIndices) {
+		super(colIndices, data.getNumColumns() );
 		_data = data;
-		_sampleIndexes = sampleIndexes;
-		reusableArr = new double[colIndexes.length];
+		reusableArr = new double[colIndices.length];
 		reusableReturn = new DblArray(reusableArr);
 	}
 
 	protected DblArray getNextRow() {
-		if(lastIndex == _sampleIndexes.length - 1)
+		if(_lastRow == _numRows - 1)
 			return null;
-		lastIndex++;
+		_lastRow++;
 		for(int i = 0; i < _colIndexes.length; i++) {
-			reusableArr[i] = _compSettings.transposeInput ? _data.quickGetValue(_colIndexes[i],
-				_sampleIndexes[lastIndex]) : _data.quickGetValue(_sampleIndexes[lastIndex], _colIndexes[i]);
+			reusableArr[i] = _data.quickGetValue(_colIndexes[i], _lastRow);
 		}
 		return reusableReturn;
 	}
-
-	@Override
-	public int getCurrentRowIndex() {
-		return _sampleIndexes[lastIndex];
-	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionSparse.java b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionSparse.java
index d560a8d..378543d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionSparse.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionSparse.java
@@ -19,10 +19,8 @@
 
 package org.apache.sysds.runtime.compress;
 
-import java.util.Arrays;
-
 import org.apache.sysds.runtime.compress.utils.DblArray;
-import org.apache.sysds.runtime.data.SparseRow;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
@@ -40,53 +38,56 @@
 	// an empty array to return if the entire row was 0.
 	private DblArray empty = new DblArray();
 
-	// current sparse row positions
-	private SparseRow[] sparseCols = null;
-	private int[] sparsePos = null;
+	private SparseBlock a;
 
 	/**
 	 * Reader of sparse matrix blocks for compression.
 	 * 
-	 * This reader should not be used if the input data is not transposed and sparse
+	 * This reader should not be used if the input data is not sparse
 	 * 
 	 * @param data         The transposed and sparse matrix
 	 * @param colIndexes   The column indexes to combine
-	 * @param compSettings The compression settings.
 	 */
-	public ReaderColumnSelectionSparse(MatrixBlock data, int[] colIndexes, CompressionSettings compSettings) {
-		super(colIndexes, compSettings.transposeInput ? data.getNumColumns() : data.getNumRows(), compSettings);
+	public ReaderColumnSelectionSparse(MatrixBlock data, int[] colIndexes) {
+		super(colIndexes, data.getNumRows());
 		reusableArr = new double[colIndexes.length];
 		reusableReturn = new DblArray(reusableArr);
-
-		sparseCols = new SparseRow[colIndexes.length];
-		sparsePos = new int[colIndexes.length];
-		if(data.getSparseBlock() != null)
-			for(int i = 0; i < colIndexes.length; i++)
-				sparseCols[i] = data.getSparseBlock().get(colIndexes[i]);
+		a = data.getSparseBlock();
 	}
 
 	protected DblArray getNextRow() {
 		if(_lastRow == _numRows - 1) {
-
 			return null;
 		}
+
 		_lastRow++;
 
-		// move pos to current row if necessary (for all columns)
-		for(int i = 0; i < _colIndexes.length; i++)
-			if(sparseCols[i] != null &&
-				(sparseCols[i].indexes().length <= sparsePos[i] || sparseCols[i].indexes()[sparsePos[i]] < _lastRow)) {
-				sparsePos[i]++;
-			}
-		// extract current values
-		Arrays.fill(reusableArr, 0);
 		boolean zeroResult = true;
-		for(int i = 0; i < _colIndexes.length; i++)
-			if(sparseCols[i] != null && sparseCols[i].indexes().length > sparsePos[i] &&
-				sparseCols[i].indexes()[sparsePos[i]] == _lastRow) {
-				reusableArr[i] = sparseCols[i].values()[sparsePos[i]];
-				zeroResult = false;
+
+		if(a != null && !a.isEmpty(_lastRow)) {
+
+			int apos = a.pos(_lastRow);
+			int alen = a.size(_lastRow) + apos;
+			int[] aix = a.indexes(_lastRow);
+			double[] avals = a.values(_lastRow);
+			int skip = 0;
+			int j = apos;
+
+			while(skip < _colIndexes.length && j < alen) {
+				if(_colIndexes[skip] == aix[j]) {
+					reusableArr[skip++] = avals[j++];
+					zeroResult = false;
+				}
+				else if(_colIndexes[skip] > aix[j]) {
+					j++;
+				}
+				else {
+					reusableArr[skip++] = 0;
+				}
+
 			}
+		}
+
 		return zeroResult ? empty : reusableReturn;
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionSparseTransposed.java b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionSparseTransposed.java
new file mode 100644
index 0000000..3670906
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/ReaderColumnSelectionSparseTransposed.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress;
+
+import org.apache.sysds.runtime.compress.utils.DblArray;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Used to extract the values at certain indexes from each row in a sparse matrix
+ * 
+ * Keeps returning all-zeros arrays until reaching the last possible index. The current compression algorithm treats the
+ * zero-value in a sparse matrix like any other value.
+ */
+public class ReaderColumnSelectionSparseTransposed extends ReaderColumnSelection {
+
+	// reusable return
+	private DblArray reusableReturn;
+	private double[] reusableArr;
+
+	// an empty array to return if the entire row was 0.
+	private DblArray empty = new DblArray();
+
+	private SparseBlock a;
+	// current sparse skip positions.
+	private int[] sparsePos = null;
+
+	/**
+	 * Reader of sparse matrix blocks for compression.
+	 * 
+	 * This reader should not be used if the input data is not transposed and sparse
+	 * 
+	 * @param data       The transposed and sparse matrix
+	 * @param colIndexes The column indexes to combine
+	 */
+	public ReaderColumnSelectionSparseTransposed(MatrixBlock data, int[] colIndexes) {
+		super(colIndexes, data.getNumColumns());
+		reusableArr = new double[colIndexes.length];
+		reusableReturn = new DblArray(reusableArr);
+
+		sparsePos = new int[colIndexes.length];
+
+		a = data.getSparseBlock();
+
+		if(data.getSparseBlock() != null)
+			for(int i = 0; i < colIndexes.length; i++) {
+				if(a.isEmpty(_colIndexes[i]))
+					// Use -1 to indicate that this column is done.
+					sparsePos[i] = -1;
+				else {
+					sparsePos[i] = a.pos(_colIndexes[i]);
+				}
+
+			}
+	}
+
+	protected DblArray getNextRow() {
+		if(_lastRow == _numRows - 1) {
+			return null;
+		}
+		_lastRow++;
+
+		boolean zeroResult = true;
+		for(int i = 0; i < _colIndexes.length; i++) {
+			int colidx = _colIndexes[i];
+			if(sparsePos[i] != -1) {
+				int apos = a.pos(colidx);
+				int alen = a.size(colidx) + apos;
+				int[] aix = a.indexes(colidx);
+				double[] avals = a.values(colidx);
+				while(sparsePos[i] < alen && aix[sparsePos[i]] < _lastRow) {
+					sparsePos[i] += 1;
+				}
+
+				if(sparsePos[i] >= alen) {
+					// Mark this column as done.
+					sparsePos[i] = -1;
+					reusableArr[i] = 0;
+				}
+				else if(aix[sparsePos[i]] == _lastRow) {
+					reusableArr[i] = avals[sparsePos[i]];
+					zeroResult = false;
+				}
+				else {
+					reusableArr[i] = 0;
+				}
+			}
+		}
+
+		return zeroResult ? empty : reusableReturn;
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColumnGroupPartitionerCost.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColumnGroupPartitionerCost.java
index 2f8221d..1b19631 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColumnGroupPartitionerCost.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColumnGroupPartitionerCost.java
@@ -30,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
 
@@ -48,10 +49,12 @@
 	@Override
 	public List<int[]> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo,
 		CompressionSettings cs) {
-
+		if(groupCols.size() > 1000)
+			throw new DMLCompressionException("I think it is an invalid number of column groups.");
+			
 		TreeMap<Integer, Queue<Queue<Integer>>> distToColId = new TreeMap<>();
 		for(Entry<Integer, GroupableColInfo> ent : groupColsInfo.entrySet()) {
-			int distinct = ent.getValue().nrDistinct;
+			int distinct = (ent.getValue().nrDistinct > 1) ? ent.getValue().nrDistinct : 1;
 			if(distToColId.containsKey(distinct)) {
 				Queue<Integer> cocodeGroup = new LinkedList<>();
 				cocodeGroup.add(ent.getKey());
@@ -67,41 +70,30 @@
 		}
 
 		boolean change = false;
+
 		while(distToColId.firstKey() < largestDistinct) {
 			Entry<Integer, Queue<Queue<Integer>>> elm = distToColId.pollFirstEntry();
 			if(elm.getValue().size() > 1) {
-				int distinctCombinations = elm.getKey()>0 ? elm.getKey() : 1;
-				Queue<Queue<Integer>> group = elm.getValue();
-				int size = group.size();
-				if(Math.pow(distinctCombinations, size) < largestDistinct) {
-					Queue<Integer> t = elm.getValue().stream().reduce(new LinkedList<>(), (acc, e) -> {
-						acc.addAll(e);
-						return acc;
-					});
-					elm.getValue().clear();
-					if(distToColId.containsKey((int) Math.pow(distinctCombinations, size))){
-						distToColId.get((int) Math.pow(distinctCombinations, size)).add(t);
-					}else{
-						elm.getValue().add(t);
-						distToColId.put((int) Math.pow(distinctCombinations, size), elm.getValue());
-					}
-					change = true;
-				}
-				else if(distinctCombinations * distinctCombinations < largestDistinct) {
+				int distinctCombinations = elm.getKey() > 1 ? elm.getKey() : 2;
+				// Queue<Queue<Integer>> group = elm.getValue();
+				int sizeCombined = (int) (distinctCombinations * distinctCombinations);
+				if(sizeCombined < largestDistinct) {
 					Queue<Integer> cols = elm.getValue().poll();
 					cols.addAll(elm.getValue().poll());
-					if(distToColId.containsKey(distinctCombinations * distinctCombinations)) {
-						Queue<Queue<Integer>> p = distToColId.get(distinctCombinations * distinctCombinations);
+					if(distToColId.containsKey(sizeCombined)) {
+						Queue<Queue<Integer>> p = distToColId.get(sizeCombined);
 						p.add(cols);
 					}
 					else {
 						Queue<Queue<Integer>> n = new LinkedList<>();
 						n.add(cols);
-						distToColId.put(distinctCombinations * distinctCombinations, n);
+						distToColId.put(sizeCombined, n);
 					}
+
 					if(elm.getValue().size() > 0) {
 						distToColId.put(elm.getKey(), elm.getValue());
 					}
+
 					change = true;
 				}
 				else {
@@ -111,28 +103,29 @@
 			}
 			else if(!distToColId.isEmpty()) {
 				Entry<Integer, Queue<Queue<Integer>>> elm2 = distToColId.pollFirstEntry();
-				int size1 = elm.getKey()>0 ? elm.getKey() : 1;
-				int size2 = elm2.getKey()>0 ? elm2.getKey() : 1;
-				if(size1 * size2 < largestDistinct) {
+				int size1 = elm.getKey() > 1 ? elm.getKey() : 2;
+				int size2 = elm2.getKey() > 1 ? elm2.getKey() : 2;
+				int sizeCombined = (int) (size1 * size2 );
+				if(sizeCombined < largestDistinct) {
 					Queue<Integer> cols = elm.getValue().poll();
 					cols.addAll(elm2.getValue().poll());
-					if(elm2.getKey() == size1 * size2){
-						elm2.getValue().add(cols);
-					}
-					else if(distToColId.containsKey(size1 * size2)) {
-						distToColId.get(size1 * size2).add(cols);
+					if(distToColId.containsKey(sizeCombined)) {
+						distToColId.get(sizeCombined).add(cols);
 					}
 					else {
 						Queue<Queue<Integer>> n = new LinkedList<>();
 						n.add(cols);
-						distToColId.put(size1 * size2, n);
+						distToColId.put(sizeCombined, n);
 					}
+
 					if(elm.getValue().size() > 0) {
 						distToColId.put(elm.getKey(), elm.getValue());
 					}
+
 					if(elm2.getValue().size() > 0) {
 						distToColId.put(elm2.getKey(), elm2.getValue());
 					}
+
 					change = true;
 				}
 				else {
@@ -160,10 +153,11 @@
 				ret.add(g);
 			}
 
-		if(LOG.isDebugEnabled()){
+		if(LOG.isDebugEnabled()) {
 			StringBuilder sb = new StringBuilder();
 			for(int[] cg : ret)
 				sb.append(Arrays.toString(cg));
+
 			LOG.debug(sb.toString());
 		}
 		return ret;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
index 6071ecd..a4347df 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
@@ -70,7 +70,7 @@
 		for(int i = 0; i < numCols; i++) {
 			int colIx = cols.get(i);
 			int cardinality = colGroups[colIx].getEstCard();
-			double weight = ((double)cardinality) / numRows;
+			double weight = ((double) cardinality) / numRows;
 			groupCols.add(colIx);
 			groupColsInfo.put(colIx, new GroupableColInfo(weight, colGroups[colIx].getMinSize(), cardinality));
 		}
@@ -78,8 +78,8 @@
 		// use column group partitioner to create partitions of columns
 		List<int[]> bins = createColumnGroupPartitioner(cs.columnPartitioner)
 			.partitionColumns(groupCols, groupColsInfo, cs);
-		
-		if (cs.columnPartitioner == PartitionerType.COST){
+
+		if(cs.columnPartitioner == PartitionerType.COST) {
 			return bins;
 		}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCodingGroup.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCodingGroup.java
index 1bf1332..a2f47a1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCodingGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCodingGroup.java
@@ -62,7 +62,7 @@
 
 		// estimating size info
 		CompressedSizeInfoColGroup groupSizeInfo = estim.estimateCompressedColGroupSize(_colIndexes);
-		
+
 		_estSize = groupSizeInfo.getMinSize();
 		_cardRatio = groupSizeInfo.getEstCard() / numRows;
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java
index 67df821..00977ca 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java
@@ -26,7 +26,6 @@
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 
 /**
@@ -81,7 +80,7 @@
 	 * 
 	 * @return an integer of count of values.
 	 */
-	public abstract int getValuesLength();
+	public abstract int size();
 
 	/**
 	 * Applies the scalar operation on the dictionary. Note that this operation modifies the underlying data, and
@@ -110,6 +109,8 @@
 	 */
 	public abstract ADictionary clone();
 
+	public abstract ADictionary cloneAndExtend(int len);
+
 	/**
 	 * Aggregates the columns into the target double array provided.
 	 * 
@@ -120,7 +121,7 @@
 	 */
 	public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) {
 		int ncol = colIndexes.length;
-		int vlen = getValuesLength() / ncol;
+		int vlen = size() / ncol;
 		// double[] ret = init;
 		// System.out.println(c.length + " " + ncol);
 		for(int k = 0; k < vlen; k++)
@@ -178,27 +179,24 @@
 	 * Note if the number of columns is one the actual dictionaries values are simply returned.
 	 * 
 	 * @param kplus     The function to apply to each value in the rows
-	 * @param kbuff     The buffer to use to aggregate the value.
 	 * @param nrColumns The number of columns in the ColGroup to know how to get the values from the dictionary.
 	 * @return a double array containing the row sums from this dictionary.
 	 */
-	protected abstract double[] sumAllRowsToDouble(KahanFunction kplus, KahanObject kbuff, int nrColumns);
+	protected abstract double[] sumAllRowsToDouble(KahanFunction kplus, int nrColumns);
 
 	/**
 	 * Sum the values at a specific row.
 	 * 
 	 * @param k         The row index to sum
 	 * @param kplus     The operator to use
-	 * @param kbuff     The buffer to aggregate inside.
 	 * @param nrColumns The number of columns
 	 * @return The sum of the row.
 	 */
-	protected abstract double sumRow(int k, KahanFunction kplus, KahanObject kbuff, int nrColumns);
-
+	protected abstract double sumRow(int k, KahanFunction kplus, int nrColumns);
 
 	protected abstract void colSum(double[] c, int[] counts, int[] colIndexes, KahanFunction kplus);
 
-	protected abstract double sum(int[] counts, int ncol,  KahanFunction kplus);
-	
+	protected abstract double sum(int[] counts, int ncol, KahanFunction kplus);
+
 	public abstract StringBuilder getString(StringBuilder sb, int colIndexes);
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
index 7929526..60524a4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.matrix.data.IJV;
@@ -203,6 +204,20 @@
 	public abstract void decompressToBlock(MatrixBlock target, int rl, int ru, int offT);
 
 	/**
+	 * Decompress the contents of this column group into the specified full matrix block without managing the number of
+	 * non zeros.
+	 * 
+	 * @param target a matrix block where the columns covered by this column group have not yet been filled in.
+	 * @param rl     row lower
+	 * @param ru     row upper
+	 * @param offT   Offset into target to assign from
+	 * @param values The Values materialized in the dictionary
+	 * @param safe   If the number of non zeros should be ignored.
+	 */
+	public abstract void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values,
+		boolean safe);
+
+	/**
 	 * Decompress the contents of this column group into the specified full matrix block.
 	 * 
 	 * @param target a matrix block where the columns covered by this column group have not yet been filled in.
@@ -378,21 +393,16 @@
 	/**
 	 * Multiply with a sparse matrix on the left hand side, and add the values to the output result
 	 * 
-	 * @param spNrVals        the Number of sparse values (since the number of indexes does not align with number of
-	 *                        values)
-	 * @param indexes         the indexes for the sparse values in the given row.
-	 * @param sparseV         the sparse values.
-	 * @param result          the linearized output matrix
-	 * @param numVals         the number of values in the dictionary
-	 * @param values          the dictionary values materialized
-	 * @param numRows         the number of rows in the left hand side input matrix (the sparse one)
-	 * @param numCols         the number of columns in the compression.
-	 * @param row             the row index of the sparse row to multiply with.
-	 * @param MaterializedRow The sparse row materialized (should only be done if needed for the specific type of
-	 *                        ColumnGroup)
+	 * @param sb              The sparse block to multiply with
+	 * @param result          The linearized output matrix
+	 * @param values          The dictionary values materialized
+	 * @param numRows         The number of rows in the left hand side input matrix (the sparse one)
+	 * @param numCols         The number of columns in the compression.
+	 * @param row             The row index of the sparse row to multiply with.
+	 * @param MaterializedRow A Temporary dense row vector to materialize the sparse values into used for OLE
 	 */
-	public abstract void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] result,
-		int numVals, double[] values, int numRows, int numCols, int row, double[] MaterializedRow);
+	public abstract void leftMultBySparseMatrix(SparseBlock sb, double[] result, double[] values, int numRows,
+		int numCols, int row, double[] MaterializedRow);
 
 	/**
 	 * Perform the specified scalar operation directly on the compressed column group, without decompressing individual
@@ -420,7 +430,7 @@
 	 * @param op The operator used
 	 * @param c  Rhe output matrix block.
 	 */
-	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, double[] c);
+	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock c);
 
 	/**
 	 * Compute the max / min value contained in the dictionary.
@@ -440,7 +450,7 @@
 	 * @param rl The Starting Row to do aggregation from
 	 * @param ru The last Row to do aggregation to (not included)
 	 */
-	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, double[] c, int rl, int ru);
+	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock c, int rl, int ru);
 
 	/**
 	 * Create a column group iterator for a row index range.
@@ -487,4 +497,14 @@
 	 */
 	public abstract boolean isLossy();
 
+	/**
+	 * Is dense, signals that the entire column group is allocated an processed. This is useful in Row wise min and max
+	 * for instance, to avoid having to scan through each row to look for empty rows.
+	 * 
+	 * an example where it is true is DDC, Const and Uncompressed.
+	 * examples where false is OLE and RLE.
+	 * 
+	 * @return returns if the colgroup is allocated in a dense fashion.
+	 */
+	public abstract boolean isDense();
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index a03a709..a4eb46a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -22,11 +22,11 @@
 import java.util.Iterator;
 
 import org.apache.sysds.runtime.DMLCompressionException;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
@@ -49,11 +49,11 @@
 	 * Constructs an Constant Colum Group, that contains only one tuple, with the given value.
 	 * 
 	 * @param colIndices The Colum indexes for the column group.
-	 * @param numRows	The number of rows contained in the group.
-	 * @param dict	   The dictionary containing one tuple for the entire compression.
+	 * @param numRows    The number of rows contained in the group.
+	 * @param dict       The dictionary containing one tuple for the entire compression.
 	 */
 	public ColGroupConst(int[] colIndices, int numRows, ADictionary dict) {
-		super(colIndices, numRows, dict);
+		super(colIndices, numRows, dict, null);
 	}
 
 	@Override
@@ -75,11 +75,11 @@
 
 	@Override
 	protected void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean) {
-		KahanObject kbuff = new KahanObject(0, 0);
+
 		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
-		double[] vals = _dict.sumAllRowsToDouble(kplus, kbuff, _colIndexes.length);
+		double[] vals = _dict.sumAllRowsToDouble(kplus, _colIndexes.length);
 		for(int rix = rl; rix < ru; rix++) {
-			setandExecute(c, kbuff, kplus2, vals[0], rix * (2 + (mean ? 1 : 0)));
+			setandExecute(c, kplus2, vals[0], rix * (2 + (mean ? 1 : 0)));
 		}
 	}
 
@@ -89,9 +89,10 @@
 	}
 
 	@Override
-	protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru) {
+	protected void computeRowMxx(MatrixBlock c, Builtin builtin, int rl, int ru) {
 		throw new DMLCompressionException(
-			"Row max not supported for Const since Const is used for overlapping ColGroups, You have to materialize rows and then calculate row max");
+			"Row max not supported for Const since Const is used for overlapping ColGroups,"
+				+ " You have to materialize rows and then calculate row max");
 	}
 
 	@Override
@@ -111,6 +112,11 @@
 
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
+		decompressToBlockSafe(target, rl, ru, offT, values, true);
+	}
+
+	@Override
+	public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values, boolean safe) {
 		final int ncol = getNumCols();
 
 		for(int i = rl; i < ru; i++, offT++)
@@ -216,10 +222,11 @@
 	}
 
 	@Override
-	public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
-		double[] values, int numRows, int numCols, int row, double[] MaterializedRow) {
+	public void leftMultBySparseMatrix(SparseBlock sb, double[] c, double[] values, int numRows, int numCols, int row,
+		double[] MaterializedRow) {
 		double v = 0;
-		for(int i = 0; i < spNrVals; i++) {
+		double[] sparseV = sb.values(row);
+		for(int i = sb.pos(row); i < sb.pos(row) + sb.size(row); i++) {
 			v += sparseV[i];
 		}
 		int offC = row * numCols;
@@ -260,4 +267,9 @@
 			rnnz[i] = base;
 		}
 	}
+
+	@Override
+	public boolean isDense(){
+		return true;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConverter.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConverter.java
index 9e7d68e..1fd8866 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConverter.java
@@ -48,16 +48,16 @@
 		else if(group instanceof ColGroupRLE) {
 			ColGroupRLE in = (ColGroupRLE) group;
 			ret = new ColGroupRLE(colIndices, in._numRows, in.hasZeros(), in._dict, in.getBitmaps(),
-				in.getBitmapOffsets());
+				in.getBitmapOffsets(), null);
 		}
 		else if(group instanceof ColGroupOLE) {
 			ColGroupOLE in = (ColGroupOLE) group;
 			ret = new ColGroupOLE(colIndices, in._numRows, in.hasZeros(), in._dict, in.getBitmaps(),
-				in.getBitmapOffsets());
+				in.getBitmapOffsets(), null);
 		}
 		else if(group instanceof ColGroupDDC1) {
 			ColGroupDDC1 in = (ColGroupDDC1) group;
-			ret = new ColGroupDDC1(colIndices, in._numRows, in._dict, in.getData(), in._zeros);
+			ret = new ColGroupDDC1(colIndices, in._numRows, in._dict, in.getData(), in._zeros, null);
 		}
 		else {
 			throw new RuntimeException("Using '" + group.getClass() + "' instance of ColGroup not fully supported");
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index 0263307..bac3982 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -24,10 +24,10 @@
 
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
@@ -46,8 +46,8 @@
 		super(colIndices, numRows, ubm, cs);
 	}
 
-	protected ColGroupDDC(int[] colIndices, int numRows, ADictionary dict) {
-		super(colIndices, numRows, dict);
+	protected ColGroupDDC(int[] colIndices, int numRows, ADictionary dict, int[] cachedCounts) {
+		super(colIndices, numRows, dict, cachedCounts);
 	}
 
 	public CompressionType getCompType() {
@@ -56,11 +56,34 @@
 
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru, int off, double[] values) {
+		decompressToBlockSafe(target, rl, ru, off, values, true);
+	}
+
+	@Override
+	public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values, boolean safe) {
 		final int nCol = getNumCols();
-		for(int i = rl; i < ru; i++, off++) {
-			for(int j = 0; j < nCol; j++) {
-				double v = target.quickGetValue(off, _colIndexes[j]);
-				target.quickSetValue(off, _colIndexes[j], getData(i, j, values) + v);
+		double[] c = target.getDenseBlockValues();
+		for(int i = rl; i < ru; i++, offT++) {
+			int rowIndex = getIndex(i) * nCol;
+
+			if(rowIndex < values.length) {
+				int rc = offT * target.getNumColumns();
+				if(safe) {
+					for(int j = 0; j < nCol; j++) {
+
+						double v = c[rc + _colIndexes[j]];
+						double nv = c[rc + _colIndexes[j]] + values[rowIndex + j];
+						if(v == 0.0 && nv != 0.0) {
+							target.setNonZeros(target.getNonZeros() + 1);
+						}
+						c[rc + _colIndexes[j]] = nv;
+					}
+				}
+				else {
+					for(int j = 0; j < nCol; j++) {
+						c[rc + _colIndexes[j]] += values[rowIndex + j];
+					}
+				}
 			}
 		}
 	}
@@ -70,11 +93,14 @@
 		int ncol = getNumCols();
 		double[] dictionary = getValues();
 		for(int i = 0; i < _numRows; i++) {
-			for(int colIx = 0; colIx < ncol; colIx++) {
-				int origMatrixColIx = getColIndex(colIx);
-				int col = colIndexTargets[origMatrixColIx];
-				double cellVal = getData(i, colIx, dictionary);
-				target.quickSetValue(i, col, target.quickGetValue(i, col) + cellVal);
+			int rowIndex = getIndex(i) * ncol;
+			if(rowIndex < dictionary.length) {
+				for(int colIx = 0; colIx < ncol; colIx++) {
+					int origMatrixColIx = getColIndex(colIx);
+					int col = colIndexTargets[origMatrixColIx];
+					double cellVal = dictionary[rowIndex + colIx];
+					target.quickSetValue(i, col, target.quickGetValue(i, col) + cellVal);
+				}
 			}
 		}
 	}
@@ -116,14 +142,15 @@
 
 	@Override
 	public void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
-		int ncol = getNumCols();
+		int ncol = _colIndexes.length;
 		final int numVals = getNumValues();
+		double[] values = _dict.getValues();
 		for(int i = rl; i < ru; i++) {
 			int lnnz = 0;
-			for(int colIx = 0; colIx < ncol; colIx++) {
-				int index = getIndex(i, colIx);
-				if(index < numVals) {
-					lnnz += (_dict.getValue(getIndex(i, colIx)) != 0) ? 1 : 0;
+			int index = getIndex(i);
+			if(index < numVals * _colIndexes.length) {
+				for(int colIx = index; colIx < ncol + index; colIx++) {
+					lnnz += (values[colIx]) != 0 ? 1 : 0;
 				}
 			}
 			rnnz[i - rl] += lnnz;
@@ -143,21 +170,21 @@
 	@Override
 	protected void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean) {
 		final int numVals = getNumValues();
-		KahanObject kbuff = new KahanObject(0, 0);
 		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
 		// pre-aggregate nnz per value tuple
-		double[] vals = _dict.sumAllRowsToDouble(kplus, kbuff, _colIndexes.length);
+		double[] vals = _dict.sumAllRowsToDouble(kplus, _colIndexes.length);
 
+		final int mult = (2 + (mean ? 1 : 0));
 		for(int rix = rl; rix < ru; rix++) {
 			int index = getIndex(rix);
 			if(index < numVals) {
-				setandExecute(c, kbuff, kplus2, vals[index], rix * (2 + (mean ? 1 : 0)));
+				setandExecute(c, kplus2, vals[index], rix * mult);
 			}
 		}
 	}
 
 	@Override
-	protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru) {
+	protected void computeRowMxx(MatrixBlock c, Builtin builtin, int rl, int ru) {
 		int ncol = getNumCols();
 		double[] dictionary = getValues();
 
@@ -165,10 +192,10 @@
 			int index = getIndex(i) * ncol;
 			for(int j = 0; j < ncol; j++) {
 				if(index < dictionary.length) {
-					c[i] = builtin.execute(c[i], dictionary[index + j]);
+					c.quickSetValue(i, 0, builtin.execute(c.quickGetValue(i, 0), dictionary[index + j]));
 				}
 				else {
-					c[i] = builtin.execute(c[i], 0.0);
+					c.quickSetValue(i, 0, builtin.execute(c.quickGetValue(i, 0), 0.0));
 				}
 			}
 		}
@@ -238,20 +265,11 @@
 	}
 
 	@Override
-	public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
-		double[] values, int numRows, int numCols, int row, double[] MaterializedRow) {
-		numVals = getNumValues();
-		for(int i = 0; i < spNrVals; i++) {
-			int k = indexes[i];
-			double aval = sparseV[i];
-			int valOff = getIndex(k);
-			if(valOff < numVals) {
-				for(int h = 0; h < _colIndexes.length; h++) {
-					int colIx = _colIndexes[h] + row * numCols;
-					c[colIx] += aval * values[valOff * _colIndexes.length + h];
-				}
-			}
-		}
+	public void leftMultBySparseMatrix(SparseBlock sb, double[] c, double[] values, int numRows, int numCols, int row,
+		double[] MaterializedRow) {
+		final int numVals = getNumValues();
+		double[] vals = preAggregateSparse(sb, row, numVals);
+		postScaling(values, vals, c, numVals, row, numCols);
 	}
 
 	@Override
@@ -292,6 +310,17 @@
 		return vals;
 	}
 
+	public double[] preAggregateSparse(SparseBlock sb, int row, int numVals) {
+		double[] vals = allocDVector(numVals + 1, true);
+		int[] indexes = sb.indexes(row);
+		double[] sparseV = sb.values(row);
+		for(int i = sb.pos(row); i < sb.size(row) + sb.pos(row); i++) {
+			int index = getIndex(indexes[i]);
+			vals[index] += sparseV[i];
+		}
+		return vals;
+	}
+
 	@Override
 	public void leftMultByRowVector(double[] a, double[] c, int numVals, double[] values) {
 
@@ -440,4 +469,10 @@
 	 */
 	protected abstract void setData(int r, int code);
 
+
+
+	@Override
+	public boolean isDense(){
+		return true;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
index 84a7683..13a47fe 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
@@ -74,8 +74,9 @@
 		}
 	}
 
-	protected ColGroupDDC1(int[] colIndices, int numRows, ADictionary dict, byte[] data, boolean zeros) {
-		super(colIndices, numRows, dict);
+	protected ColGroupDDC1(int[] colIndices, int numRows, ADictionary dict, byte[] data, boolean zeros,
+		int[] cachedCounts) {
+		super(colIndices, numRows, dict, cachedCounts);
 		_data = data;
 		_zeros = zeros;
 	}
@@ -102,7 +103,8 @@
 	}
 
 	@Override
-	public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl, int cu){
+	public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl,
+		int cu) {
 		LinearAlgebraUtils.vectListAddDDC(preAggregatedB, c, _data, rl, ru, cl, cu, thatNrColumns, getNumValues());
 	}
 
@@ -137,7 +139,7 @@
 	@Override
 	protected double getData(int r, int colIx, double[] values) {
 		int index = (_data[r] & 0xFF) * getNumCols() + colIx;
-		return (index < values.length) ? values[index] :  0.0;
+		return (index < values.length) ? values[index] : 0.0;
 	}
 
 	@Override
@@ -191,24 +193,26 @@
 	public ColGroup scalarOperation(ScalarOperator op) {
 		double val0 = op.executeScalar(0);
 		if(op.sparseSafe || val0 == 0 || !_zeros) {
-			return new ColGroupDDC1(_colIndexes, _numRows, applyScalarOp(op), _data, _zeros);
+			return new ColGroupDDC1(_colIndexes, _numRows, applyScalarOp(op), _data, _zeros, getCachedCounts());
 		}
 		else {
-			return new ColGroupDDC1(_colIndexes, _numRows, applyScalarOp(op, val0, _colIndexes.length), _data, false);
+			return new ColGroupDDC1(_colIndexes, _numRows, applyScalarOp(op, val0, _colIndexes.length), _data, false,
+				getCachedCounts());
 		}
 	}
 
 	@Override
 	public ColGroup binaryRowOp(BinaryOperator op, double[] v, boolean sparseSafe) {
 		sparseSafe = sparseSafe || !_zeros;
-		return new ColGroupDDC1(_colIndexes, _numRows, applyBinaryRowOp(op.fn, v, sparseSafe), _data, !sparseSafe);
+		return new ColGroupDDC1(_colIndexes, _numRows, applyBinaryRowOp(op.fn, v, sparseSafe), _data, !sparseSafe,
+			getCachedCounts());
 	}
 
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
 		sb.append(super.toString());
-		sb.append(" DataLength: " + this._data.length);
+		sb.append("\nDataLength: " + this._data.length);
 		sb.append(Arrays.toString(this._data));
 		return sb.toString();
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
index 6236b68..c828896 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
@@ -70,15 +70,16 @@
 		}
 	}
 
-	protected ColGroupDDC2(int[] colIndices, int numRows, ADictionary dict, char[] data, boolean zeros) {
-		super(colIndices, numRows, dict);
+	protected ColGroupDDC2(int[] colIndices, int numRows, ADictionary dict, char[] data, boolean zeros,
+		int[] cachedCounts) {
+		super(colIndices, numRows, dict, cachedCounts);
 		_data = data;
 		_zeros = zeros;
 	}
 
 	@Override
 	protected ColGroupType getColGroupType() {
-		return ColGroupType.DDC1;
+		return ColGroupType.DDC2;
 	}
 
 	/**
@@ -124,8 +125,9 @@
 	}
 
 	@Override
-	public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl, int cu){
-		LinearAlgebraUtils.vectListAddDDC(preAggregatedB, c, _data, rl, ru, cl, cu, thatNrColumns,getNumValues());
+	public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl,
+		int cu) {
+		LinearAlgebraUtils.vectListAddDDC(preAggregatedB, c, _data, rl, ru, cl, cu, thatNrColumns, getNumValues());
 	}
 
 	@Override
@@ -174,24 +176,32 @@
 	public ColGroup scalarOperation(ScalarOperator op) {
 		double val0 = op.executeScalar(0);
 		if(op.sparseSafe || val0 == 0 || !_zeros) {
-			return new ColGroupDDC2(_colIndexes, _numRows, applyScalarOp(op), _data, _zeros);
+			return new ColGroupDDC2(_colIndexes, _numRows, applyScalarOp(op), _data, _zeros, getCachedCounts());
 		}
 		else {
-			return new ColGroupDDC2(_colIndexes, _numRows, applyScalarOp(op, val0, _colIndexes.length), _data, false);
+			return new ColGroupDDC2(_colIndexes, _numRows, applyScalarOp(op, val0, _colIndexes.length), _data, false,
+				getCachedCounts());
 		}
 	}
 
 	@Override
 	public ColGroup binaryRowOp(BinaryOperator op, double[] v, boolean sparseSafe) {
 		sparseSafe = sparseSafe || !_zeros;
-		return new ColGroupDDC2(_colIndexes, _numRows, applyBinaryRowOp(op.fn, v, sparseSafe), _data, !sparseSafe);
+		return new ColGroupDDC2(_colIndexes, _numRows, applyBinaryRowOp(op.fn, v, sparseSafe), _data, !sparseSafe,
+			getCachedCounts());
 	}
 
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
 		sb.append(super.toString());
-		sb.append(" DataLength: " + this._data.length);
+		sb.append("\nDataLength: " + this._data.length);
+		sb.append("[");
+		for(char c : this._data){
+			sb.append((int)c);
+			sb.append(" ");
+		}
+		sb.append("]");
 		return sb.toString();
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 34ac053..2deb2bb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.runtime.compress.colgroup;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,6 +30,8 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.BitmapEncoder;
@@ -46,7 +49,7 @@
  * Factory pattern for constructing ColGroups.
  */
 public class ColGroupFactory {
-	// private static final Log LOG = LogFactory.getLog(ColGroupFactory.class.getName());
+	private static final Log LOG = LogFactory.getLog(ColGroupFactory.class.getName());
 
 	/**
 	 * The actual compression method, that handles the logic of compressing multiple columns together. This method also
@@ -61,6 +64,7 @@
 	 */
 	public static ColGroup[] compressColGroups(MatrixBlock in, HashMap<Integer, Double> compRatios, List<int[]> groups,
 		CompressionSettings compSettings, int k) {
+
 		if(k <= 1) {
 			return compressColGroups(in, compRatios, groups, compSettings);
 		}
@@ -75,7 +79,7 @@
 				for(Future<ColGroup> lrtask : rtask)
 					ret.add(lrtask.get());
 				pool.shutdown();
-				return ret.toArray(new ColGroup[0]);
+				return ret.toArray(new ColGroup[groups.size()]);
 			}
 			catch(InterruptedException | ExecutionException e) {
 				// If there is an error in the parallel execution default to the non parallel implementation
@@ -89,7 +93,6 @@
 		ColGroup[] ret = new ColGroup[groups.size()];
 		for(int i = 0; i < groups.size(); i++)
 			ret[i] = compressColGroup(in, compRatios, groups.get(i), compSettings);
-
 		return ret;
 	}
 
@@ -144,7 +147,6 @@
 		CompressionSettings compSettings) {
 
 		int[] allGroupIndices = colIndexes.clone();
-
 		CompressedSizeInfoColGroup sizeInfo;
 		// The compression type is decided based on a full bitmap since it
 		// will be reused for the actual compression step.
@@ -152,13 +154,13 @@
 		PriorityQueue<CompressedColumn> compRatioPQ = CompressedColumn.makePriorityQue(compRatios, colIndexes);
 
 		// Switching to exact estimator here, when doing the actual compression.
-		CompressedSizeEstimator estimator = new CompressedSizeEstimatorExact(in, compSettings);
+		CompressedSizeEstimator estimator = new CompressedSizeEstimatorExact(in, compSettings, compSettings.transposed);
 
 		while(true) {
 
 			// STEP 1.
 			// Extract the entire input column list and observe compression ratio
-			ubm = BitmapEncoder.extractBitmap(colIndexes, in, compSettings);
+			ubm = BitmapEncoder.extractBitmap(colIndexes, in, compSettings.transposed);
 			sizeInfo = new CompressedSizeInfoColGroup(estimator.estimateCompressedColGroupSize(ubm),
 				compSettings.validCompressions);
 
@@ -180,9 +182,9 @@
 
 			// Furthermore performance of a compressed representation that does not compress much, is decremental to
 			// overall performance.
-			
+
 			if(compRatio > 1.0 || compSettings.columnPartitioner == PartitionerType.COST) {
-				int rlen = compSettings.transposeInput ? in.getNumColumns() : in.getNumRows();
+				int rlen = compSettings.transposed ? in.getNumColumns() : in.getNumRows();
 				return compress(colIndexes, rlen, ubm, sizeInfo.getBestCompressionType(), compSettings, in);
 			}
 			else {
@@ -233,7 +235,7 @@
 			case OLE:
 				return new ColGroupOLE(colIndexes, rlen, ubm, cs);
 			case UNCOMPRESSED:
-				return new ColGroupUncompressed(colIndexes, rawMatrixBlock, cs);
+				return new ColGroupUncompressed(colIndexes, rawMatrixBlock, cs.transposed);
 			default:
 				throw new DMLCompressionException("Not implemented ColGroup Type compressed in factory.");
 		}
@@ -267,7 +269,8 @@
 
 		if(!remainingCols.isEmpty()) {
 			int[] list = remainingCols.stream().mapToInt(i -> i).toArray();
-			ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawBlock, compSettings);
+			LOG.warn("Uncompressable Columns: " + Arrays.toString(list));
+			ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawBlock, compSettings.transposed);
 			_colGroups.add(ucgroup);
 		}
 		return _colGroups;
@@ -279,4 +282,5 @@
 			ret.add(i);
 		return ret;
 	}
+
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
index a48532d..a26c177 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
@@ -27,11 +27,11 @@
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
 import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
@@ -71,8 +71,8 @@
 	}
 
 	protected ColGroupOLE(int[] colIndices, int numRows, boolean zeros, ADictionary dict, char[] bitmaps,
-		int[] bitmapOffs) {
-		super(colIndices, numRows, zeros, dict);
+		int[] bitmapOffs, int[] counts) {
+		super(colIndices, numRows, zeros, dict, counts);
 		_data = bitmaps;
 		_ptr = bitmapOffs;
 	}
@@ -89,6 +89,11 @@
 
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
+		decompressToBlockSafe(target, rl, ru, offT, values, true);
+	}
+
+	@Override
+	public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values, boolean safe) {
 
 		final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
 		final int numCols = getNumCols();
@@ -96,7 +101,7 @@
 
 		// cache blocking config and position array
 		int[] apos = skipScan(numVals, rl);
-
+		double[] c = target.getDenseBlockValues();
 		// cache conscious append via horizontal scans
 		for(int bi = (rl / blksz) * blksz; bi < ru; bi += blksz) {
 			for(int k = 0, off = 0; k < numVals; k++, off += numCols) {
@@ -112,9 +117,21 @@
 					int row = bi + _data[i];
 					if(row >= rl && row < ru) {
 						int rix = row - (rl - offT);
+						int rc = rix * target.getNumColumns();
 						for(int j = 0; j < numCols; j++) {
-							double v = target.quickGetValue(rix, _colIndexes[j]);
-							target.setValue(rix, _colIndexes[j], values[off + j] + v);
+							if(safe) {
+
+								double v = c[rc + _colIndexes[j]];
+								double nv = c[rc + _colIndexes[j]] + values[off + j];
+								if(v == 0.0 && nv != 0.0) {
+									target.setNonZeros(target.getNonZeros() + 1);
+								}
+								c[rc + _colIndexes[j]] = nv;
+							}
+							else {
+								c[rc + _colIndexes[j]] += values[off + j];
+							}
+
 						}
 					}
 				}
@@ -243,7 +260,8 @@
 		// LOG.debug(this.toString());
 		// Note 0 is because the size can be calculated based on the given values,
 		// And because the fourth argument is only needed in estimation, not when an OLE ColGroup is created.
-		return ColGroupSizes.estimateInMemorySizeOLE(getNumCols(), getValues().length, _data.length, 0, isLossy());
+		return ColGroupSizes
+			.estimateInMemorySizeOLE(getNumCols(), _dict.size(), (_data == null) ? 0 : _data.length, _numRows, isLossy());
 	}
 
 	@Override
@@ -252,7 +270,7 @@
 		// fast path: sparse-safe operations
 		// Note that bitmaps don't change and are shallow-copied
 		if(op.sparseSafe || val0 == 0 || !_zeros) {
-			return new ColGroupOLE(_colIndexes, _numRows, _zeros, applyScalarOp(op), _data, _ptr);
+			return new ColGroupOLE(_colIndexes, _numRows, _zeros, applyScalarOp(op), _data, _ptr, getCachedCounts());
 		}
 		// slow path: sparse-unsafe operations (potentially create new bitmap)
 		// note: for efficiency, we currently don't drop values that become 0
@@ -260,7 +278,7 @@
 		int[] loff = computeOffsets(lind);
 
 		if(loff.length == 0) { // empty offset list: go back to fast path
-			return new ColGroupOLE(_colIndexes, _numRows, false, applyScalarOp(op), _data, _ptr);
+			return new ColGroupOLE(_colIndexes, _numRows, false, applyScalarOp(op), _data, _ptr, getCachedCounts());
 		}
 
 		ADictionary rvalues = applyScalarOp(op, val0, getNumCols());
@@ -270,7 +288,7 @@
 		int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length + 1);
 		rbitmapOffs[rbitmapOffs.length - 1] = rbitmaps.length;
 
-		return new ColGroupOLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs);
+		return new ColGroupOLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs, getCachedCounts());
 	}
 
 	@Override
@@ -280,7 +298,8 @@
 		// fast path: sparse-safe operations
 		// Note that bitmaps don't change and are shallow-copied
 		if(sparseSafe) {
-			return new ColGroupOLE(_colIndexes, _numRows, _zeros, applyBinaryRowOp(op.fn, v, sparseSafe), _data, _ptr);
+			return new ColGroupOLE(_colIndexes, _numRows, _zeros, applyBinaryRowOp(op.fn, v, sparseSafe), _data, _ptr,
+				getCachedCounts());
 		}
 
 		// slow path: sparse-unsafe operations (potentially create new bitmap)
@@ -288,7 +307,8 @@
 		boolean[] lind = computeZeroIndicatorVector();
 		int[] loff = computeOffsets(lind);
 		if(loff.length == 0) { // empty offset list: go back to fast path
-			return new ColGroupOLE(_colIndexes, _numRows, false, applyBinaryRowOp(op.fn, v, true), _data, _ptr);
+			return new ColGroupOLE(_colIndexes, _numRows, false, applyBinaryRowOp(op.fn, v, true), _data, _ptr,
+				getCachedCounts());
 		}
 		ADictionary rvalues = applyBinaryRowOp(op.fn, v, sparseSafe);
 		char[] lbitmap = genOffsetBitmap(loff, loff.length);
@@ -297,7 +317,16 @@
 		int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length + 1);
 		rbitmapOffs[rbitmapOffs.length - 1] = rbitmaps.length;
 
-		return new ColGroupOLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs);
+		// Also note that for efficiency of following operations (and less memory usage because they share index
+		// structures),
+		// the materialized is also applied to this.
+		// so that following operations don't suffer from missing zeros.
+		_data = rbitmaps;
+		_ptr = rbitmapOffs;
+		_zeros = false;
+		_dict = _dict.cloneAndExtend(_colIndexes.length);
+
+		return new ColGroupOLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs, getCachedCounts());
 	}
 
 	@Override
@@ -616,11 +645,14 @@
 	}
 
 	@Override
-	public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
-		double[] values, int numRows, int numCols, int row, double[] tmpA) {
+	public void leftMultBySparseMatrix(SparseBlock sb, double[] c, double[] values, int numRows, int numCols, int row,
+		double[] tmpA) {
 		final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
-
-		if(numVals >= 1 && _numRows > blksz) {
+		final int numVals = getNumValues();
+		int sparseEndIndex = sb.size(row) + sb.pos(row);
+		int[] indexes = sb.indexes(row);
+		double[] sparseV = sb.values(row);
+		if(numVals > 1 && _numRows > blksz) {
 
 			// cache blocking config (see matrix-vector mult for explanation)
 			final int blksz2 = 2 * CompressionSettings.BITMAP_BLOCK_SZ;
@@ -629,15 +661,11 @@
 			int[] apos = allocIVector(numVals, true);
 			double[] cvals = allocDVector(numVals, true);
 			// step 2: cache conscious matrix-vector via horizontal scans
-			int pI = 0;
+			int pI = sb.pos(row);
 			for(int ai = 0; ai < _numRows; ai += blksz2) {
 				int aimax = Math.min(ai + blksz2, _numRows);
-
-				for(int i = 0; i < blksz2; i++) {
-					tmpA[i] = 0;
-				}
-
-				for(; pI < spNrVals && indexes[pI] < aimax; pI++) {
+				Arrays.fill(tmpA, 0);
+				for(; pI < sparseEndIndex && indexes[pI] < aimax; pI++) {
 					if(indexes[pI] >= ai)
 						tmpA[indexes[pI] - ai] = sparseV[pI];
 				}
@@ -675,13 +703,11 @@
 				int boff = _ptr[k];
 				int blen = len(k);
 				double vsum = 0;
-				int pI = 0;
+				int pI = sb.pos(row);
 				for(int bix = 0, off = 0; bix < blen; bix += _data[boff + bix] + 1, off += blksz) {
 					// blockId = off / blksz;
-					for(int i = 0; i < blksz; i++) {
-						tmpA[i] = 0;
-					}
-					for(; pI < spNrVals && indexes[pI] < off + blksz; pI++) {
+					Arrays.fill(tmpA, 0);
+					for(; pI < sparseEndIndex && indexes[pI] < off + blksz; pI++) {
 						if(indexes[pI] >= off)
 							tmpA[indexes[pI] - off] = sparseV[pI];
 					}
@@ -703,7 +729,6 @@
 
 	@Override
 	protected final void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean) {
-		KahanObject kbuff = new KahanObject(0, 0);
 		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
 		final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
 		final int numVals = getNumValues();
@@ -713,7 +738,7 @@
 
 			// step 1: prepare position and value arrays
 			int[] apos = skipScan(numVals, rl);
-			double[] aval = _dict.sumAllRowsToDouble(kplus, kbuff, _colIndexes.length);
+			double[] aval = _dict.sumAllRowsToDouble(kplus, _colIndexes.length);
 
 			// step 2: cache conscious row sums via horizontal scans
 			for(int bi = rl; bi < ru; bi += blksz2) {
@@ -734,7 +759,7 @@
 						// compute partial results
 						for(int i = 0; i < len; i++) {
 							int rix = ii + _data[pos + i];
-							setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0)));
+							setandExecute(c, kplus2, val, rix * (2 + (mean ? 1 : 0)));
 						}
 						bix += len + 1;
 					}
@@ -749,7 +774,7 @@
 				// prepare value-to-add for entire value bitmap
 				int boff = _ptr[k];
 				int blen = len(k);
-				double val = _dict.sumRow(k, kplus, kbuff, _colIndexes.length);
+				double val = _dict.sumRow(k, kplus, _colIndexes.length);
 
 				// iterate over bitmap blocks and add values
 				if(val != 0) {
@@ -759,7 +784,7 @@
 						slen = _data[boff + bix];
 						for(int i = 1; i <= slen; i++) {
 							int rix = off + _data[boff + bix + i];
-							setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0)));
+							setandExecute(c, kplus2, val, rix * (2 + (mean ? 1 : 0)));
 						}
 					}
 				}
@@ -773,7 +798,7 @@
 	}
 
 	@Override
-	protected final void computeRowMxx(double[] c, Builtin builtin, int rl, int ru) {
+	protected final void computeRowMxx(MatrixBlock c, Builtin builtin, int rl, int ru) {
 		// NOTE: zeros handled once for all column groups outside
 		final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
 		final int numVals = getNumValues();
@@ -794,7 +819,7 @@
 				slen = _data[boff + bix];
 				for(int i = 1; i <= slen; i++) {
 					int rix = off + _data[boff + bix + i];
-					c[rix] = builtin.execute(c[rix], val);
+					c.quickSetValue(rix, 0, builtin.execute(c.quickGetValue(rix, 0), val));
 				}
 			}
 		}
@@ -856,11 +881,12 @@
 				int bix = apos[k];
 
 				// iterate over bitmap blocks and add values
-				for(int off = bi, slen = 0; bix < blen && off < bimax; bix += slen + 1, off += blksz) {
-					slen = _data[boff + bix];
+				for(int off = bi; bix < blen && off < bimax; off += blksz) {
+					int slen = _data[boff + bix];
 					for(int blckIx = 1; blckIx <= slen; blckIx++) {
 						rnnz[off + _data[boff + bix + blckIx] - rl] += numCols;
 					}
+					bix += slen + 1;
 				}
 
 				apos[k] = bix;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
index c4b6d08..483a7a4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
@@ -65,8 +65,8 @@
 		super(colIndices, numRows, ubm, cs);
 	}
 
-	protected ColGroupOffset(int[] colIndices, int numRows, boolean zeros, ADictionary dict) {
-		super(colIndices, numRows, dict);
+	protected ColGroupOffset(int[] colIndices, int numRows, boolean zeros, ADictionary dict, int[] cachedCounts) {
+		super(colIndices, numRows, dict, cachedCounts);
 		_zeros = zeros;
 	}
 
@@ -90,13 +90,13 @@
 	@Override
 	public long estimateInMemorySize() {
 		// Could use a ternary operator, but it looks odd with our code formatter here.
-		if(_data == null) {
-			return ColGroupSizes.estimateInMemorySizeOffset(getNumCols(), _colIndexes.length, 0, 0, isLossy());
-		}
-		else {
-			return ColGroupSizes
-				.estimateInMemorySizeOffset(getNumCols(), getValues().length, _ptr.length, _data.length, isLossy());
-		}
+
+		return ColGroupSizes.estimateInMemorySizeOffset(getNumCols(),
+			getValues() == null ? 0 : getValues().length,
+			_ptr == null ? 0 : _ptr.length,
+			_data == null ? 0 : _data.length,
+			isLossy());
+
 	}
 
 	// generic decompression for OLE/RLE, to be overwritten for performance
@@ -484,4 +484,9 @@
 			while(!_inclZeros && (_vpos < 0 || _dict.getValue(_vpos * getNumCols() + _cpos) == 0));
 		}
 	}
+
+	@Override
+	public boolean isDense(){
+		return false;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
index 551bfb3..cdbf615 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
@@ -28,11 +28,11 @@
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
 import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.KahanFunction;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.Pair;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
@@ -69,13 +69,11 @@
 
 		// compact bitmaps to linearized representation
 		createCompressedBitmaps(numVals, totalLen, lbitmaps);
-
-		// LOG.error(this);
 	}
 
 	protected ColGroupRLE(int[] colIndices, int numRows, boolean zeros, ADictionary dict, char[] bitmaps,
-		int[] bitmapOffs) {
-		super(colIndices, numRows, zeros, dict);
+		int[] bitmapOffs, int[] cachedCounts) {
+		super(colIndices, numRows, zeros, dict, cachedCounts);
 		_data = bitmaps;
 		_ptr = bitmapOffs;
 	}
@@ -92,6 +90,11 @@
 
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT, double[] values) {
+		decompressToBlockSafe(target, rl, ru, offT, values, true);
+	}
+
+	@Override
+	public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values, boolean safe) {
 		final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
 		final int numCols = getNumCols();
 		final int numVals = getNumValues();
@@ -588,28 +591,32 @@
 	}
 
 	@Override
-	public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
-		double[] values, int numRows, int numCols, int row, double[] MaterializedRow) {
+	public void leftMultBySparseMatrix(SparseBlock sb, double[] c, double[] values, int numRows, int numCols, int row,
+		double[] MaterializedRow) {
+
+		final int numVals = getNumValues();
+		int sparseEndIndex = sb.size(row) + sb.pos(row);
+		int[] indexes = sb.indexes(row);
+		double[] sparseV = sb.values(row);
 		for(int k = 0, valOff = 0; k < numVals; k++, valOff += _colIndexes.length) {
 			int boff = _ptr[k];
 			int blen = len(k);
 
 			double vsum = 0;
-			int pointerIndexes = 0;
+			int pointSparse = sb.pos(row);
 			int curRunEnd = 0;
 			for(int bix = 0; bix < blen; bix += 2) {
 				int curRunStartOff = curRunEnd + _data[boff + bix];
 				int curRunLen = _data[boff + bix + 1];
 				curRunEnd = curRunStartOff + curRunLen;
-				while(pointerIndexes < spNrVals && indexes[pointerIndexes] < curRunStartOff) {
-					pointerIndexes++;
+				while(pointSparse < sparseEndIndex && indexes[pointSparse] < curRunStartOff) {
+					pointSparse++;
 				}
-				while(pointerIndexes != spNrVals && indexes[pointerIndexes] >= curRunStartOff &&
-					indexes[pointerIndexes] < curRunEnd) {
-					vsum += sparseV[pointerIndexes];
-					pointerIndexes++;
+				while(pointSparse != sparseEndIndex && indexes[pointSparse] >= curRunStartOff &&
+					indexes[pointSparse] < curRunEnd) {
+					vsum += sparseV[pointSparse++];
 				}
-				if(pointerIndexes == spNrVals) {
+				if(pointSparse == sparseEndIndex) {
 					break;
 				}
 			}
@@ -629,7 +636,7 @@
 		// fast path: sparse-safe operations
 		// Note that bitmaps don't change and are shallow-copied
 		if(op.sparseSafe || val0 == 0 || !_zeros) {
-			return new ColGroupRLE(_colIndexes, _numRows, _zeros, applyScalarOp(op), _data, _ptr);
+			return new ColGroupRLE(_colIndexes, _numRows, _zeros, applyScalarOp(op), _data, _ptr, getCachedCounts());
 		}
 
 		// slow path: sparse-unsafe operations (potentially create new bitmap)
@@ -637,7 +644,7 @@
 		boolean[] lind = computeZeroIndicatorVector();
 		int[] loff = computeOffsets(lind);
 		if(loff.length == 0) { // empty offset list: go back to fast path
-			return new ColGroupRLE(_colIndexes, _numRows, false, applyScalarOp(op), _data, _ptr);
+			return new ColGroupRLE(_colIndexes, _numRows, false, applyScalarOp(op), _data, _ptr, getCachedCounts());
 		}
 
 		ADictionary rvalues = applyScalarOp(op, val0, getNumCols());
@@ -647,7 +654,7 @@
 		int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length + 1);
 		rbitmapOffs[rbitmapOffs.length - 1] = rbitmaps.length;
 
-		return new ColGroupRLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs);
+		return new ColGroupRLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs, getCachedCounts());
 	}
 
 	@Override
@@ -657,7 +664,8 @@
 		// fast path: sparse-safe operations
 		// Note that bitmaps don't change and are shallow-copied
 		if(sparseSafe) {
-			return new ColGroupRLE(_colIndexes, _numRows, _zeros, applyBinaryRowOp(op.fn, v, sparseSafe), _data, _ptr);
+			return new ColGroupRLE(_colIndexes, _numRows, _zeros, applyBinaryRowOp(op.fn, v, sparseSafe), _data, _ptr,
+				getCachedCounts());
 		}
 
 		// slow path: sparse-unsafe operations (potentially create new bitmap)
@@ -665,7 +673,8 @@
 		boolean[] lind = computeZeroIndicatorVector();
 		int[] loff = computeOffsets(lind);
 		if(loff.length == 0) { // empty offset list: go back to fast path
-			return new ColGroupRLE(_colIndexes, _numRows, false, applyBinaryRowOp(op.fn, v, true), _data, _ptr);
+			return new ColGroupRLE(_colIndexes, _numRows, false, applyBinaryRowOp(op.fn, v, true), _data, _ptr,
+				getCachedCounts());
 		}
 
 		ADictionary rvalues = applyBinaryRowOp(op.fn, v, sparseSafe);
@@ -675,7 +684,16 @@
 		int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length + 1);
 		rbitmapOffs[rbitmapOffs.length - 1] = rbitmaps.length;
 
-		return new ColGroupRLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs);
+		// Also note that for efficiency of following operations (and less memory usage because they share index
+		// structures),
+		// the materialized is also applied to this.
+		// so that following operations don't suffer from missing zeros.
+		_data = rbitmaps;
+		_ptr = rbitmapOffs;
+		_zeros = false;
+		_dict = _dict.cloneAndExtend(_colIndexes.length);
+
+		return new ColGroupRLE(_colIndexes, _numRows, false, rvalues, rbitmaps, rbitmapOffs, getCachedCounts());
 	}
 
 	@Override
@@ -685,7 +703,6 @@
 
 	@Override
 	protected final void computeRowSums(double[] c, KahanFunction kplus, int rl, int ru, boolean mean) {
-		KahanObject kbuff = new KahanObject(0, 0);
 		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
 
 		final int numVals = getNumValues();
@@ -698,7 +715,7 @@
 			// current pos / values per RLE list
 			int[] astart = new int[numVals];
 			int[] apos = skipScan(numVals, rl, astart);
-			double[] aval = _dict.sumAllRowsToDouble(kplus, kbuff, _colIndexes.length);
+			double[] aval = _dict.sumAllRowsToDouble(kplus, _colIndexes.length);
 
 			// step 2: cache conscious matrix-vector via horizontal scans
 			for(int bi = rl; bi < ru; bi += blksz) {
@@ -719,7 +736,7 @@
 						int from = Math.max(bi, start + lstart);
 						int to = Math.min(start + lstart + llen, bimax);
 						for(int rix = from; rix < to; rix++) {
-							setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0)));
+							setandExecute(c, kplus2, val, rix * (2 + (mean ? 1 : 0)));
 						}
 						if(start + lstart + llen >= bimax)
 							break;
@@ -736,7 +753,7 @@
 			for(int k = 0; k < numVals; k++) {
 				int boff = _ptr[k];
 				int blen = len(k);
-				double val = _dict.sumRow(k, kplus, kbuff, _colIndexes.length);
+				double val = _dict.sumRow(k, kplus, _colIndexes.length);
 
 				if(val != 0.0) {
 					Pair<Integer, Integer> tmp = skipScanVal(k, rl);
@@ -747,7 +764,7 @@
 						curRunStartOff = curRunEnd + _data[boff + bix];
 						curRunEnd = curRunStartOff + _data[boff + bix + 1];
 						for(int rix = curRunStartOff; rix < curRunEnd && rix < ru; rix++) {
-							setandExecute(c, kbuff, kplus2, val, rix * (2 + (mean ? 1 : 0)));
+							setandExecute(c, kplus2, val, rix * (2 + (mean ? 1 : 0)));
 						}
 					}
 				}
@@ -761,7 +778,7 @@
 	}
 
 	@Override
-	protected final void computeRowMxx(double[] c, Builtin builtin, int rl, int ru) {
+	protected final void computeRowMxx(MatrixBlock c, Builtin builtin, int rl, int ru) {
 		// NOTE: zeros handled once for all column groups outside
 		final int numVals = getNumValues();
 		// double[] c = result.getDenseBlockValues();
@@ -780,7 +797,7 @@
 				curRunStartOff = curRunEnd + _data[boff + bix];
 				curRunEnd = curRunStartOff + _data[boff + bix + 1];
 				for(int rix = curRunStartOff; rix < curRunEnd && rix < ru; rix++)
-					c[rix] = builtin.execute(c[rix], val);
+					c.quickSetValue(rix, 0, builtin.execute(c.quickGetValue(rix, 0), val));
 			}
 		}
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
index 4688ecd..3a56109 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
@@ -41,9 +41,10 @@
 	public static long estimateInMemorySizeGroupValue(int nrColumns, int nrValues, boolean lossy) {
 		long size = estimateInMemorySizeGroup(nrColumns);
 		size += 8; // Dictionary Reference.
-		if(lossy){
+		if(lossy) {
 			size += QDictionary.getInMemorySize(nrValues);
-		}else{
+		}
+		else {
 			size += Dictionary.getInMemorySize(nrValues);
 		}
 		return size;
@@ -70,19 +71,22 @@
 		return size;
 	}
 
-	public static long estimateInMemorySizeOffset(int nrColumns, int nrValues, int pointers, int offsetLength, boolean lossy) {
+	public static long estimateInMemorySizeOffset(int nrColumns, int nrValues, int pointers, int offsetLength,
+		boolean lossy) {
 		long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, lossy);
 		size += MemoryEstimates.intArrayCost(pointers);
 		size += MemoryEstimates.charArrayCost(offsetLength);
 		return size;
 	}
 
-	public static long estimateInMemorySizeOLE(int nrColumns, int nrValues, int offsetLength, int nrRows, boolean lossy) {
+	public static long estimateInMemorySizeOLE(int nrColumns, int nrValues, int offsetLength, int nrRows,
+		boolean lossy) {
+		// LOG.error(nrColumns + " " + nrValues + " " + offsetLength + " " + nrRows + " " + lossy);
 		nrColumns = nrColumns > 0 ? nrColumns : 1;
 		offsetLength += (nrRows / CompressionSettings.BITMAP_BLOCK_SZ) * 2;
 		long size = 0;
 		size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues / nrColumns) + 1, offsetLength, lossy);
-		if (nrRows > CompressionSettings.BITMAP_BLOCK_SZ * 2){
+		if(nrRows > CompressionSettings.BITMAP_BLOCK_SZ * 2) {
 			size += MemoryEstimates.intArrayCost((int) nrValues / nrColumns);
 		}
 		return size;
@@ -96,12 +100,13 @@
 		return size;
 	}
 
-	public static long estimateInMemorySizeCONST(int nrColumns, int nrValues, boolean lossy){
+	public static long estimateInMemorySizeCONST(int nrColumns, int nrValues, boolean lossy) {
 		long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, lossy);
 		return size;
 	}
 
 	public static long estimateInMemorySizeUncompressed(int nrRows, int nrColumns, double sparsity) {
+		// LOG.error(nrRows + " " + nrColumns + " " + sparsity);
 		long size = 0;
 		// Since the Object is a col group the overhead from the Memory Size group is added
 		size += estimateInMemorySizeGroup(nrColumns);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index bc50ca5..bab75c5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -28,7 +28,6 @@
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.DMLCompressionException;
-import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseBlock.Type;
 import org.apache.sysds.runtime.data.SparseRow;
@@ -74,11 +73,11 @@
 	 * @param compSettings   The Settings for how to compress this block, Here using information about the raw block if
 	 *                       it is transposed.
 	 */
-	protected ColGroupUncompressed(int[] colIndicesList, MatrixBlock rawBlock, CompressionSettings compSettings) {
-		super(colIndicesList, compSettings.transposeInput ? rawBlock.getNumColumns() : rawBlock.getNumRows());
+	protected ColGroupUncompressed(int[] colIndicesList, MatrixBlock rawBlock, boolean transposed) {
+		super(colIndicesList, transposed ? rawBlock.getNumColumns() : rawBlock.getNumRows());
 
 		// prepare meta data
-		int numRows = compSettings.transposeInput ? rawBlock.getNumColumns() : rawBlock.getNumRows();
+		int numRows = transposed ? rawBlock.getNumColumns() : rawBlock.getNumRows();
 
 		// Create a matrix with just the requested rows of the original block
 		_data = new MatrixBlock(numRows, _colIndexes.length, rawBlock.isInSparseFormat());
@@ -90,8 +89,9 @@
 		// special cases empty blocks
 		if(rawBlock.isEmptyBlock(false))
 			return;
+
 		// special cases full block
-		if(!compSettings.transposeInput && _data.getNumColumns() == rawBlock.getNumColumns()) {
+		if(!transposed && _data.getNumColumns() == rawBlock.getNumColumns()) {
 			_data.copy(rawBlock);
 			return;
 		}
@@ -101,8 +101,8 @@
 		int n = _colIndexes.length;
 		for(int i = 0; i < m; i++) {
 			for(int j = 0; j < n; j++) {
-				double val = compSettings.transposeInput ? rawBlock.quickGetValue(_colIndexes[j], i) : rawBlock
-					.quickGetValue(i, _colIndexes[j]);
+				double val = transposed ? rawBlock.quickGetValue(_colIndexes[j], i) : rawBlock.quickGetValue(i,
+					_colIndexes[j]);
 				_data.appendValue(i, j, val);
 			}
 		}
@@ -234,6 +234,11 @@
 	}
 
 	@Override
+	public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values, boolean safe) {
+		decompressToBlock(target, rl, ru, offT, values);
+	}
+
+	@Override
 	public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) {
 		// empty block, nothing to add to output
 		if(_data.isEmptyBlock(false)) {
@@ -343,8 +348,8 @@
 	}
 
 	@Override
-	public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
-		double[] values, int numRows, int numCols, int row, double[] MaterializedRow) {
+	public void leftMultBySparseMatrix(SparseBlock sb, double[] c, double[] values, int numRows, int numCols, int row,
+		double[] MaterializedRow) {
 		throw new NotImplementedException("Should not be called use other matrix function for uncompressed columns");
 	}
 
@@ -404,7 +409,7 @@
 	}
 
 	@Override
-	public void unaryAggregateOperations(AggregateUnaryOperator op, double[] result, int rl, int ru) {
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) {
 		throw new NotImplementedException("Unimplemented Specific Sub ColGroup Aggregation Operation");
 	}
 
@@ -567,4 +572,10 @@
 		return false;
 	}
 
+	@Override
+	public boolean isDense() {
+		// Even if the uncompressed column groups can be sparse allocated,
+		// they are dense in the sense of compression.
+		return true;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
index 442c2ad..54a45d0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.DMLScriptException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
@@ -41,7 +42,6 @@
 import org.apache.sysds.runtime.functionobjects.ReduceCol;
 import org.apache.sysds.runtime.functionobjects.ReduceRow;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
-import org.apache.sysds.runtime.instructions.cp.KahanObject;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.Pair;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
@@ -99,19 +99,20 @@
 		}
 	}
 
-	protected ColGroupValue(int[] colIndices, int numRows, ADictionary dict) {
+	protected ColGroupValue(int[] colIndices, int numRows, ADictionary dict, int[] cachedCounts) {
 		super(colIndices, numRows);
 		_dict = dict;
+		counts = cachedCounts;
 	}
-	
+
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
-		decompressToBlock(target,rl,ru,rl);
+		decompressToBlock(target, rl, ru, rl);
 	}
 
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru, int offT) {
-		decompressToBlock(target,rl,ru,offT, getValues());
+		decompressToBlock(target, rl, ru, offT, getValues());
 	}
 
 	/**
@@ -175,6 +176,10 @@
 		}
 	}
 
+	public final int[] getCachedCounts() {
+		return counts;
+	}
+
 	/**
 	 * Returns the counts of values inside the MatrixBlock returned in getValuesAsBlock Throws an exception if the
 	 * getIfCountsType is false.
@@ -224,13 +229,15 @@
 	}
 
 	protected final double sumValuesSparse(int valIx, SparseRow[] rows, double[] dictVals, int rowsIndex) {
-		final int numCols = getNumCols();
-		final int valOff = valIx * numCols;
-		double val = 0;
-		for(int i = 0; i < numCols; i++) {
-			val += dictVals[valOff + i] * rows[i].values()[rowsIndex];
-		}
-		return val;
+		throw new NotImplementedException("This Method was implemented incorrectly");
+		// final int numCols = getNumCols();
+		// final int valOff = valIx * numCols;
+		// double val = 0;
+		// for(int i = 0; i < numCols; i++) {
+		// // TODO FIX ?
+		// val += dictVals[valOff + i] * rows[i].values()[rowsIndex];
+		// }
+		// return val;
 	}
 
 	protected final double[] preaggValues(int numVals, double[] b, double[] dictVals) {
@@ -294,33 +301,37 @@
 	public double[] preaggValues(final int numVals, final double[] b, double[] dictVals, final int cl, final int cu,
 		final int cut) {
 
-		final double[] ret = allocDVector(numVals * (cu - cl), true);
+		final double[] ret = new double[numVals * (cu - cl)];
 
 		return preaggValues(numVals, b, dictVals, cl, cu, cut, ret);
 	}
 
 	public double[] preaggValues(final int numVals, final SparseBlock b, double[] dictVals, final int cl, final int cu,
 		final int cut, final double[] ret) {
+		// There is currently an error here with regards to the cl and cu, that are not in use.
+		// if cl and cu is specified to anything other than cl = 0 and cu = number of columns in b the code will crash.
 
-		final int retRows = (cu - cl);
-		for(int h = 0; h < _colIndexes.length; h++) {
-			SparseRow row = b.get(_colIndexes[h]);
-			// SparseRow row = b[_colIndexes[h]];
-			for(int i = 0; i < row.size(); i++) {
-				double v = row.values()[i];
-				for(int k = h, off = row.indexes()[i];
-					k < numVals * _colIndexes.length;
-					k += _colIndexes.length, off += retRows) {
-					ret[off] += dictVals[k] * v;
+		final int retCols = (cu - cl);
+		
+		for(int h = 0; h< _colIndexes.length; h++){
+			int colIdx = _colIndexes[h];
+			if(!b.isEmpty(colIdx)){
+				double[] sValues = b.values(colIdx);
+				int[] sIndexes = b.indexes(colIdx);
+				for(int i = b.pos(colIdx); i < b.size(colIdx) + b.pos(colIdx); i++) {
+					for(int j = 0, offOrg = h; j< numVals * retCols; j+= retCols, offOrg += _colIndexes.length){
+						ret[j + sIndexes[i]] += dictVals[offOrg] * sValues[i];
+					}
 				}
 			}
 		}
+
 		return ret;
 	}
 
 	public double[] preaggValues(final int numVals, final SparseBlock b, double[] dictVals, final int cl, final int cu,
 		final int cut) {
-		return preaggValues(numVals, b, dictVals, cl, cu, cut, allocDVector(numVals * (cu - cl), true));
+		return preaggValues(numVals, b, dictVals, cl, cu, cut, new double[numVals * (cu - cl)]);
 	}
 
 	protected final double[] preaggValue(int k, double[] b, double[] dictVals, int cl, int cu, int cut) {
@@ -402,18 +413,18 @@
 	 * @param sparseSafe Specify if the operation is sparseSafe. if false then allocate a new tuple.
 	 * @return The new Dictionary with values.
 	 */
-	protected ADictionary applyBinaryRowOp(ValueFunction fn, double[] v, boolean sparseSafe) {
+	public ADictionary applyBinaryRowOp(ValueFunction fn, double[] v, boolean sparseSafe) {
 		return sparseSafe ? _dict.clone().applyBinaryRowOp(fn, v, sparseSafe, _colIndexes) : _dict
 			.applyBinaryRowOp(fn, v, sparseSafe, _colIndexes);
 	}
 
 	@Override
-	public void unaryAggregateOperations(AggregateUnaryOperator op, double[] c) {
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock c) {
 		unaryAggregateOperations(op, c, 0, _numRows);
 	}
 
 	@Override
-	public void unaryAggregateOperations(AggregateUnaryOperator op, double[] c, int rl, int ru) {
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock c, int rl, int ru) {
 		// sum and sumsq (reduceall/reducerow over tuples and counts)
 		if(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ||
 			op.aggOp.increOp.fn instanceof Mean) {
@@ -423,11 +434,11 @@
 			boolean mean = op.aggOp.increOp.fn instanceof Mean;
 
 			if(op.indexFn instanceof ReduceAll)
-				computeSum(c, kplus);
+				computeSum(c.getDenseBlockValues(), kplus);
 			else if(op.indexFn instanceof ReduceCol)
-				computeRowSums(c, kplus, rl, ru, mean);
+				computeRowSums(c.getDenseBlockValues(), kplus, rl, ru, mean);
 			else if(op.indexFn instanceof ReduceRow)
-				computeColSums(c, kplus);
+				computeColSums(c.getDenseBlockValues(), kplus);
 		}
 		// min and max (reduceall/reducerow over tuples only)
 		else if(op.aggOp.increOp.fn instanceof Builtin &&
@@ -436,22 +447,27 @@
 			Builtin builtin = (Builtin) op.aggOp.increOp.fn;
 
 			if(op.indexFn instanceof ReduceAll)
-				c[0] = computeMxx(c[0], builtin);
+				c.getDenseBlockValues()[0] = computeMxx(c.getDenseBlockValues()[0], builtin);
 			else if(op.indexFn instanceof ReduceCol)
 				computeRowMxx(c, builtin, rl, ru);
 			else if(op.indexFn instanceof ReduceRow)
-				computeColMxx(c, builtin);
+				computeColMxx(c.getDenseBlockValues(), builtin);
 		}
 		else {
 			throw new DMLScriptException("Unknown UnaryAggregate operator on CompressedMatrixBlock");
 		}
 	}
 
-	protected void setandExecute(double[] c, KahanObject kbuff, KahanPlus kplus2, double val, int rix) {
-		kbuff.set(c[rix], c[rix + 1]);
-		kplus2.execute2(kbuff, val);
-		c[rix] = kbuff._sum;
-		c[rix + 1] = kbuff._correction;
+	protected void setandExecute(double[] c, KahanPlus kplus2, double val, int rix) {
+		if(kplus2 instanceof KahanPlus) {
+			// normal plus
+			// kbuff.set(c[rix], c[rix + 1]);
+			// kplus2.execute2(kbuff, val);
+			c[rix] += val;
+		}
+		else {
+			c[rix] += val * val;
+		}
 	}
 
 	public static void setupThreadLocalMemory(int len) {
@@ -469,8 +485,10 @@
 		Pair<int[], double[]> p = memPool.get();
 
 		// sanity check for missing setup
-		if(p.getValue() == null)
+		if(p.getValue() == null) {
+			LOG.error("Mempool was not allocated!!!");
 			return new double[len];
+		}
 
 		// get and reset if necessary
 		double[] tmp = p.getValue();
@@ -500,7 +518,6 @@
 		sb.append(String.format("\n%15s%5d ", "Columns:", _colIndexes.length));
 		sb.append(Arrays.toString(_colIndexes));
 		sb.append(String.format("\n%15s%5d ", "Values:", _dict.getValues().length));
-		sb.append("\n");
 		_dict.getString(sb, _colIndexes.length);
 		return sb.toString();
 	}
@@ -566,7 +583,7 @@
 
 	protected abstract void computeColSums(double[] c, KahanFunction kplus);
 
-	protected abstract void computeRowMxx(double[] c, Builtin builtin, int rl, int ru);
+	protected abstract void computeRowMxx(MatrixBlock c, Builtin builtin, int rl, int ru);
 
 	protected Object clone() throws CloneNotSupportedException {
 		return super.clone();
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
index ea9527e..320cd84 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
@@ -50,18 +50,18 @@
 
 	@Override
 	public double[] getValues() {
-		return _values;
+		return (_values == null) ? new double[0]: _values;
 	}
 
 	@Override
 	public double getValue(int i) {
-		return (i >= _values.length) ? 0.0 : _values[i];
+		return (i >= size()) ? 0.0 : _values[i];
 	}
 
 	@Override
 	public long getInMemorySize() {
 		// object + values array + double
-		return getInMemorySize(_values.length);
+		return getInMemorySize(size());
 	}
 
 	protected static long getInMemorySize(int valuesCount) {
@@ -70,11 +70,13 @@
 	}
 
 	@Override
-	public int hasZeroTuple(int ncol) {
-		int len = _values.length / ncol;
-		for(int i = 0, off = 0; i < len; i++, off += ncol) {
+	public int hasZeroTuple(int nCol) {
+		if(_values == null)
+			return -1;
+		int len = getNumberOfValues(nCol);
+		for(int i = 0, off = 0; i < len; i++, off += nCol) {
 			boolean allZeros = true;
-			for(int j = 0; j < ncol; j++)
+			for(int j = 0; j < nCol; j++)
 				allZeros &= (_values[off + j] == 0);
 			if(allZeros)
 				return i;
@@ -85,7 +87,7 @@
 	@Override
 	public double aggregate(double init, Builtin fn) {
 		// full aggregate can disregard tuple boundaries
-		int len = _values.length;
+		int len = size();
 		double ret = init;
 		for(int i = 0; i < len; i++)
 			ret = fn.execute(ret, _values[i]);
@@ -95,7 +97,7 @@
 	@Override
 	public Dictionary apply(ScalarOperator op) {
 		// in-place modification of the dictionary
-		int len = _values.length;
+		int len = size();
 		for(int i = 0; i < len; i++)
 			_values[i] = op.executeScalar(_values[i]);
 		return this;
@@ -114,7 +116,7 @@
 
 	@Override
 	public Dictionary applyBinaryRowOp(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) {
-		final int len = _values.length;
+		final int len = size();
 		final int lenV = colIndexes.length;
 		if(sparseSafe) {
 			for(int i = 0; i < len; i++) {
@@ -141,10 +143,12 @@
 	}
 
 	@Override
-	public int getValuesLength() {
-		return _values.length;
+	public Dictionary cloneAndExtend(int len) {
+		double[] ret = Arrays.copyOf(_values, _values.length + len);
+		return new Dictionary(ret);
 	}
 
+
 	public static Dictionary read(DataInput in) throws IOException {
 		int numVals = in.readInt();
 		// read distinct values
@@ -156,47 +160,63 @@
 
 	@Override
 	public void write(DataOutput out) throws IOException {
-		out.writeInt(_values.length);
-		for(int i = 0; i < _values.length; i++)
+		out.writeInt(size());
+		for(int i = 0; i < size(); i++)
 			out.writeDouble(_values[i]);
 	}
 
 	@Override
 	public long getExactSizeOnDisk() {
-		return 4 + 8 * _values.length;
+		return 4 + 8 * size();
+	}
+
+	public int size(){
+		return (_values == null) ? 0 : _values.length;
 	}
 
 	@Override
-	public int getNumberOfValues(int ncol) {
-		return _values.length / ncol;
+	public int getNumberOfValues(int nCol) {
+		return (_values == null) ? 0 : _values.length / nCol;
 	}
 
 	@Override
-	protected double[] sumAllRowsToDouble(KahanFunction kplus, KahanObject kbuff, int nrColumns) {
+	protected double[] sumAllRowsToDouble(KahanFunction kplus, int nrColumns) {
 		if(nrColumns == 1 && kplus instanceof KahanPlus)
 			return getValues(); // shallow copy of values
 
 		// pre-aggregate value tuple
-		final int numVals = _values.length / nrColumns;
+		final int numVals = getNumberOfValues(nrColumns);
 		double[] ret = ColGroupValue.allocDVector(numVals, false);
 		for(int k = 0; k < numVals; k++) {
-			ret[k] = sumRow(k, kplus, kbuff, nrColumns);
+			ret[k] = sumRow(k, kplus, nrColumns);
 		}
 
 		return ret;
 	}
 
 	@Override
-	protected double sumRow(int k, KahanFunction kplus, KahanObject kbuff, int nrColumns) {
-		kbuff.set(0, 0);
+	protected double sumRow(int k, KahanFunction kplus, int nrColumns) {
+		if(_values == null)
+			return 0;
 		int valOff = k * nrColumns;
-		for(int i = 0; i < nrColumns; i++)
-			kplus.execute2(kbuff, _values[valOff + i]);
-		return kbuff._sum;
+		double res = 0.0;
+		if(kplus instanceof KahanPlus) {
+			for(int i = 0; i < nrColumns; i++) {
+				res += _values[valOff + i];
+			}
+		}
+		else {
+			// kSquare
+			for(int i = 0; i < nrColumns; i++)
+				res += _values[valOff + i] * _values[valOff + i];
+		}
+		return res;
 	}
 
 	@Override
 	protected void colSum(double[] c, int[] counts, int[] colIndexes, KahanFunction kplus) {
+		if(_values == null)
+			return;
 		KahanObject kbuff = new KahanObject(0, 0);
 		int valOff = 0;
 		final int rows = c.length / 2;
@@ -215,6 +235,8 @@
 
 	@Override
 	protected double sum(int[] counts, int ncol, KahanFunction kplus) {
+		if(_values == null)
+			return 0;
 		KahanObject kbuff = new KahanObject(0, 0);
 		int valOff = 0;
 		for(int k = 0; k < _values.length / ncol; k++) {
@@ -235,11 +257,16 @@
 		return sb.toString();
 	}
 
-	public StringBuilder getString(StringBuilder sb, int colIndexes){
-		for(int i = 0; i< _values.length; i++){
+	public StringBuilder getString(StringBuilder sb, int colIndexes) {
+		sb.append("[");
+		for(int i = 0; i < _values.length-1; i++) {
 			sb.append(_values[i]);
-			sb.append((i) % (colIndexes ) == colIndexes - 1  ? "\n" : " ");
+			sb.append((i) % (colIndexes) == colIndexes - 1 ? " : " : ", ");
 		}
+		if(_values != null && _values.length > 1){
+			sb.append(_values[_values.length-1]);
+		}
+		sb.append("]");
 		return sb;
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
index 0105e7b..ac4035f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
@@ -62,7 +62,9 @@
 
 	@Override
 	public double[] getValues() {
-		// TODO: use a temporary double array for this.
+		if(_values == null){
+			return new double[0];
+		}
 		double[] res = new double[_values.length];
 		for(int i = 0; i < _values.length; i++) {
 			res[i] = getValue(i);
@@ -72,7 +74,7 @@
 
 	@Override
 	public double getValue(int i) {
-		return (i >= _values.length) ? 0.0 : _values[i] * _scale;
+		return (i >= size()) ? 0.0 : _values[i] * _scale;
 	}
 
 	public byte getValueByte(int i) {
@@ -90,7 +92,7 @@
 	@Override
 	public long getInMemorySize() {
 		// object + values array + double
-		return getInMemorySize(_values.length);
+		return getInMemorySize(size());
 	}
 
 	public static long getInMemorySize(int valuesCount) {
@@ -99,11 +101,13 @@
 	}
 
 	@Override
-	public int hasZeroTuple(int ncol) {
-		int len = _values.length / ncol;
-		for(int i = 0, off = 0; i < len; i++, off += ncol) {
+	public int hasZeroTuple(int nCol) {
+		if(_values == null)
+			return -1;
+		int len = getNumberOfValues(nCol);
+		for(int i = 0, off = 0; i < len; i++, off += nCol) {
 			boolean allZeros = true;
-			for(int j = 0; j < ncol; j++)
+			for(int j = 0; j < nCol; j++)
 				allZeros &= (_values[off + j] == 0);
 			if(allZeros)
 				return i;
@@ -114,7 +118,7 @@
 	@Override
 	public double aggregate(double init, Builtin fn) {
 		// full aggregate can disregard tuple boundaries
-		int len = _values.length;
+		int len = size();
 		double ret = init;
 		for(int i = 0; i < len; i++)
 			ret = fn.execute(ret, getValue(i));
@@ -123,6 +127,8 @@
 
 	@Override
 	public QDictionary apply(ScalarOperator op) {
+		if(_values == null)
+			return this;
 
 		if(op.fn instanceof Multiply || op.fn instanceof Divide) {
 			_scale = op.executeScalar(_scale);
@@ -162,7 +168,7 @@
 	public QDictionary applyScalarOp(ScalarOperator op, double newVal, int numCols) {
 		double[] temp = getValues();
 		double max = Math.abs(newVal);
-		for(int i = 0; i < _values.length; i++) {
+		for(int i = 0; i < size(); i++) {
 			temp[i] = op.executeScalar(temp[i]);
 			double absTemp = Math.abs(temp[i]);
 			if(absTemp > max) {
@@ -170,22 +176,30 @@
 			}
 		}
 		double scale = max / (double) (Byte.MAX_VALUE);
-		byte[] res = new byte[_values.length + numCols];
-		for(int i = 0; i < _values.length; i++) {
+		byte[] res = new byte[size() + numCols];
+		for(int i = 0; i < size(); i++) {
 			res[i] = (byte) Math.round(temp[i] / scale);
 		}
-		Arrays.fill(res, _values.length, _values.length + numCols, (byte) Math.round(newVal / scale));
+		Arrays.fill(res, size(), size() + numCols, (byte) Math.round(newVal / scale));
 		return new QDictionary(res, scale);
 	}
 
 	@Override
 	public QDictionary applyBinaryRowOp(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) {
-		// TODO Use a temporary double array for this.
+	
+		if (_values == null){
+			if (sparseSafe){
+				return new QDictionary(null, 1);
+			} else{
+				_values = new byte[0];
+			}
+		}
+			
 		double[] temp = sparseSafe ? new double[_values.length] : new double[_values.length + colIndexes.length];
 		double max = Math.abs(fn.execute(0, v[0]));
 		final int colL = colIndexes.length;
 		int i = 0;
-		for(; i < _values.length; i++) {
+		for(; i < size(); i++) {
 			temp[i] = fn.execute(_values[i] * _scale, v[colIndexes[i % colL]]);
 			double absTemp = Math.abs(temp[i]);
 			if(absTemp > max) {
@@ -193,7 +207,7 @@
 			}
 		}
 		if(!sparseSafe)
-			for(; i < _values.length + colL; i++) {
+			for(; i <size() + colL; i++) {
 				temp[i] = fn.execute(0, v[colIndexes[i % colL]]);
 				double absTemp = Math.abs(temp[i]);
 				if(absTemp > max) {
@@ -202,7 +216,7 @@
 			}
 
 		double scale = max / (double) (Byte.MAX_VALUE);
-		byte[] res = sparseSafe ? _values : new byte[_values.length + colIndexes.length];
+		byte[] res = sparseSafe ? _values : new byte[size() + colIndexes.length];
 
 		for(i = 0; i < temp.length; i++) {
 			res[i] = (byte) Math.round(temp[i] / scale);
@@ -211,8 +225,8 @@
 	}
 
 	@Override
-	public int getValuesLength() {
-		return _values.length;
+	public int size(){
+		return _values == null ? 0 : _values.length;
 	}
 
 	@Override
@@ -220,11 +234,17 @@
 		return new QDictionary(_values.clone(), _scale);
 	}
 
+	@Override
+	public QDictionary cloneAndExtend(int len) {
+		byte[] ret = Arrays.copyOf(_values, _values.length + len);
+		return new QDictionary(ret, _scale);
+	}
+
 	public static QDictionary read(DataInput in) throws IOException {
 		double scale = in.readDouble();
 		int numVals = in.readInt();
 		// read distinct values
-		byte[] values = new byte[numVals];
+		byte[] values = numVals == 0 ? null : new byte[numVals];
 		for(int i = 0; i < numVals; i++)
 			values[i] = in.readByte();
 		return new QDictionary(values, scale);
@@ -240,43 +260,46 @@
 
 	@Override
 	public long getExactSizeOnDisk() {
-		return 8 + 4 + _values.length;
+		return 8 + 4 + size();
 	}
 
 	@Override
 	public int getNumberOfValues(int nCol) {
-		return _values.length / nCol;
+		return (_values == null) ? 0 : _values.length / nCol;
 	}
 
 	@Override
-	protected double[] sumAllRowsToDouble(KahanFunction kplus, KahanObject kbuff, int nrColumns) {
+	protected double[] sumAllRowsToDouble(KahanFunction kplus, int nrColumns) {
 		if(nrColumns == 1 && kplus instanceof KahanPlus)
 			return getValues(); // shallow copy of values
 
-		final int numVals = _values.length / nrColumns;
+		final int numVals =  getNumberOfValues(nrColumns);
 		double[] ret = ColGroupValue.allocDVector(numVals, false);
 		for(int k = 0; k < numVals; k++) {
-			ret[k] = sumRow(k, kplus, kbuff, nrColumns);
+			ret[k] = sumRow(k, kplus, nrColumns);
 		}
 
 		return ret;
 	}
 
 	@Override
-	protected double sumRow(int k, KahanFunction kplus, KahanObject kbuff, int nrColumns) {
+	protected double sumRow(int k, KahanFunction kplus, int nrColumns) {
+		if (_values == null) return 0;
 		int valOff = k * nrColumns;
+		
 		if(kplus instanceof KahanPlus) {
-			short res = 0;
+			int res = 0;
 			for(int i = 0; i < nrColumns; i++) {
 				res += _values[valOff + i];
 			}
 			return res * _scale;
 		}
 		else {
-			kbuff.set(0, 0);
+			// kSquare
+			double res = 0.0;
 			for(int i = 0; i < nrColumns; i++)
-				kplus.execute2(kbuff, _values[valOff + i] * _scale);
-			return kbuff._sum;
+				res += (int) (_values[valOff + i] * _values[valOff + i]) * _scale * _scale;
+			return res;
 		}
 	}
 
@@ -287,7 +310,7 @@
 		if(!(kplus instanceof KahanPlusSq)) {
 			int[] sum = new int[colIndexes.length];
 			int valOff = 0;
-			for(int k = 0; k < _values.length / colIndexes.length; k++) {
+			for(int k = 0; k < getNumberOfValues(colIndexes.length); k++) {
 				int cntk = counts[k];
 				for(int j = 0; j < colIndexes.length; j++) {
 					sum[j] += cntk * getValueByte(valOff++);
@@ -300,7 +323,7 @@
 		else {
 			KahanObject kbuff = new KahanObject(0, 0);
 			int valOff = 0;
-			for(int k = 0; k < _values.length / colIndexes.length; k++) {
+			for(int k = 0; k < getNumberOfValues(colIndexes.length); k++) {
 				int cntk = counts[k];
 				for(int j = 0; j < colIndexes.length; j++) {
 					kbuff.set(c[colIndexes[j]], c[colIndexes[j] + rows]);
@@ -317,7 +340,7 @@
 		if(!(kplus instanceof KahanPlusSq)) {
 			int sum = 0;
 			int valOff = 0;
-			for(int k = 0; k < _values.length / ncol; k++) {
+			for(int k = 0; k < getNumberOfValues(ncol); k++) {
 				int countK = counts[k];
 				for(int j = 0; j < ncol; j++) {
 					sum += countK * getValueByte(valOff++);
@@ -328,7 +351,7 @@
 		else {
 			KahanObject kbuff = new KahanObject(0, 0);
 			int valOff = 0;
-			for(int k = 0; k < _values.length / ncol; k++) {
+			for(int k = 0; k < getNumberOfValues(ncol); k++) {
 				int countK = counts[k];
 				for(int j = 0; j < ncol; j++) {
 					kplus.execute3(kbuff, getValue(valOff++), countK);
@@ -339,7 +362,7 @@
 	}
 
 	public StringBuilder getString(StringBuilder sb, int colIndexes) {
-		for(int i = 0; i < _values.length; i++) {
+		for(int i = 0; i < size(); i++) {
 			sb.append(_values[i]);
 			sb.append((i) % (colIndexes) == colIndexes - 1 ? "\n" : " ");
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
index f7675ae..b17e44a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
@@ -45,13 +45,19 @@
 	/** The Matrix Block to extract the compression estimates from */
 	protected MatrixBlock _data;
 	/** The number of rows in the matrix block, extracted to a field because the matrix could be transposed */
-	protected final int _numRows;
+	protected int _numRows;
 	/** The number of columns in the matrix block, extracted to a field because the matrix could be transposed */
-	protected final int _numCols;
+	protected int _numCols;
 	/** The compression settings to use, for estimating the size, and compress the ColGroups. */
 	protected final CompressionSettings _compSettings;
 
 	/**
+	 * boolean specifying if the _data is in transposed format. This is used to select the correct readers for the
+	 * extraction of bitmaps for the columns.
+	 */
+	protected boolean _transposed = false;
+
+	/**
 	 * Main Constructor for Compression Estimator.
 	 * 
 	 * protected because the factory should be used to construct the CompressedSizeEstimator
@@ -59,10 +65,11 @@
 	 * @param data         The matrix block to extract information from
 	 * @param compSettings The Compression settings used.
 	 */
-	protected CompressedSizeEstimator(MatrixBlock data, CompressionSettings compSettings) {
+	protected CompressedSizeEstimator(MatrixBlock data, CompressionSettings compSettings, boolean transposed) {
 		_data = data;
-		_numRows = compSettings.transposeInput ? _data.getNumColumns() : _data.getNumRows();
-		_numCols = compSettings.transposeInput ? _data.getNumRows() : _data.getNumColumns();
+		_transposed = transposed;
+		_numRows = _transposed ? _data.getNumColumns() : _data.getNumRows();
+		_numCols = _transposed ? _data.getNumRows() : _data.getNumColumns();
 		_compSettings = compSettings;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
index 00bc011..a16b228 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -29,13 +29,13 @@
  */
 public class CompressedSizeEstimatorExact extends CompressedSizeEstimator {
 
-	public CompressedSizeEstimatorExact(MatrixBlock data, CompressionSettings compSettings) {
-		super(data, compSettings);
+	public CompressedSizeEstimatorExact(MatrixBlock data, CompressionSettings compSettings, boolean transposed) {
+		super(data, compSettings, transposed);
 	}
 
 	@Override
 	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) {
-		ABitmap entireBitMap = BitmapEncoder.extractBitmap(colIndexes, _data, _compSettings);
+		ABitmap entireBitMap = BitmapEncoder.extractBitmap(colIndexes, _data, _transposed);
 		return new CompressedSizeInfoColGroup(estimateCompressedColGroupSize(entireBitMap),
 			_compSettings.validCompressions);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index 5003a75..e44a26a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -19,17 +19,37 @@
 
 package org.apache.sysds.runtime.compress.estim;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 public class CompressedSizeEstimatorFactory {
+	protected static final Log LOG = LogFactory.getLog(CompressedSizeEstimatorFactory.class.getName());
 
-	public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings compSettings) {
-		long elements = compSettings.transposeInput ? data.getNumColumns() : data.getNumRows();
-		elements = data.getNonZeros() / (compSettings.transposeInput ? data.getNumRows() : data.getNumColumns());
+	private static final int minimumSampleSize = 2000;
 
-		return (compSettings.samplingRatio >= 1.0 || elements < 1000) ? new CompressedSizeEstimatorExact(data,
-			compSettings) : new CompressedSizeEstimatorSample(data, compSettings,
-				(int) Math.ceil(elements * compSettings.samplingRatio));
+	public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings compSettings,
+		boolean transposed) {
+		long elements = transposed ? data.getNumColumns() : data.getNumRows();
+		elements = data.getNonZeros() / (transposed ? data.getNumRows() : data.getNumColumns());
+		CompressedSizeEstimator est;
+
+		// Calculate the sample size.
+		// If the sample size is very small, set it to the minimum size
+		int sampleSize = Math.max((int) Math.ceil(elements * compSettings.samplingRatio), minimumSampleSize);
+		if(compSettings.samplingRatio >= 1.0 || elements < minimumSampleSize || sampleSize > elements) {
+			est = new CompressedSizeEstimatorExact(data, compSettings, transposed);
+		}
+		else {
+			int[] sampleRows = CompressedSizeEstimatorSample.getSortedUniformSample(
+				transposed ? data.getNumColumns() : data.getNumRows(),
+				sampleSize,
+				compSettings.seed);
+				est = new CompressedSizeEstimatorSample(data, compSettings, sampleRows, transposed);
+		}
+
+		LOG.debug(est);
+		return est;
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index e40035c..3a5b703 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -26,51 +26,66 @@
 import org.apache.sysds.runtime.compress.estim.sample.HassAndStokes;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
 import org.apache.sysds.runtime.compress.utils.ABitmap.BitmapType;
+import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.UtilFunctions;
 
 public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 
-	private int[] _sampleRows = null;
+	private static final int FORCE_TRANSPOSE_ON_SAMPLE_THRESHOLD = 8000;
+
+	private final int[] _sampleRows;
 	private HashMap<Integer, Double> _solveCache = null;
 
 	/**
 	 * CompressedSizeEstimatorSample, samples from the input data and estimates the size of the compressed matrix.
 	 * 
-	 * @param data         The input data sampled from
+	 * @param data         The input data toSample from
 	 * @param compSettings The Settings used for the sampling, and compression, contains information such as seed.
-	 * @param sampleSize   Size of the sampling used
+	 * @param sampleRows   The rows sampled
+	 * @param transposed   Boolean specifying if the input is already transposed.
 	 */
-	public CompressedSizeEstimatorSample(MatrixBlock data, CompressionSettings compSettings, int sampleSize) {
-		super(data, compSettings);
-
-		_sampleRows = getSortedUniformSample(_numRows, sampleSize, _compSettings.seed);
-
-		// Override the _data Matrix block with the sampled matrix block.
-		MatrixBlock select = new MatrixBlock(_numRows, 1, false);
-		for(int i = 0; i < sampleSize; i++)
-			select.quickSetValue(_sampleRows[i], 0, 1);
-		_data = _data.removeEmptyOperations(new MatrixBlock(), !_compSettings.transposeInput, true, select);
-
-		// establish estimator-local cache for numeric solve
+	public CompressedSizeEstimatorSample(MatrixBlock data, CompressionSettings compSettings, int[] sampleRows,
+		boolean transposed) {
+		super(data, compSettings, transposed);
+		_sampleRows = sampleRows;
 		_solveCache = new HashMap<>();
+		_data = sampleData(data, compSettings, sampleRows, transposed);
+	}
+
+	protected MatrixBlock sampleData(MatrixBlock data, CompressionSettings compSettings, int[] sampleRows,
+		boolean transposed) {
+		// Override the _data Matrix block with the sampled matrix block.
+		MatrixBlock select = (transposed) ? new MatrixBlock(data.getNumColumns(), 1,
+			true) : new MatrixBlock(data.getNumRows(), 1, true);
+		for(int i = 0; i < sampleRows.length; i++)
+			select.appendValue(sampleRows[i], 0, 1);
+
+		MatrixBlock sampledMatrixBlock = data.removeEmptyOperations(new MatrixBlock(), !transposed, true, select);
+		if(!transposed && sampleRows.length > FORCE_TRANSPOSE_ON_SAMPLE_THRESHOLD) {
+			_transposed = true;
+			sampledMatrixBlock = LibMatrixReorg
+				.transpose(sampledMatrixBlock, new MatrixBlock(sampleRows.length, data.getNumRows(), true), 1);
+		}
+
+		return sampledMatrixBlock;
+
 	}
 
 	@Override
 	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) {
 		int sampleSize = _sampleRows.length;
 		int numCols = colIndexes.length;
-		int[] sampleRows = _sampleRows;
 
 		// extract statistics from sample
-		ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, _data, _compSettings);
+		ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, _data, _transposed);
 		EstimationFactors fact = EstimationFactors.computeSizeEstimationFactors(ubm, false, _numRows, numCols);
 
 		// estimate number of distinct values (incl fixes for anomalies w/ large sample fraction)
 		// TODO Replace this with lib matrix/data/LibMatrixCountDistinct
-		int totalCardinality = getNumDistinctValues(ubm, _numRows, sampleRows, _solveCache);
+		int totalCardinality = getNumDistinctValues(ubm, _numRows, sampleSize, _solveCache);
 		totalCardinality = Math.max(totalCardinality, fact.numVals);
-		totalCardinality =  _compSettings.lossy ? Math.min(totalCardinality, numCols * 127) : totalCardinality;
+		totalCardinality = _compSettings.lossy ? Math.min(totalCardinality, numCols * 127) : totalCardinality;
 		totalCardinality = Math.min(totalCardinality, _numRows);
 
 		// Number of unseen values
@@ -88,7 +103,7 @@
 		// estimate number of segments and number of runs incl correction for
 		// empty segments and empty runs (via expected mean of offset value)
 		// int numUnseenSeg = (int) (unseenVals * Math.ceil((double) _numRows / BitmapEncoder.BITMAP_BLOCK_SZ / 2));
-		int totalNumRuns = ubm.getNumValues() > 0 ? getNumRuns(ubm, sampleSize, _numRows, sampleRows) : 0;
+		int totalNumRuns = ubm.getNumValues() > 0 ? getNumRuns(ubm, sampleSize, _numRows, _sampleRows) : 0;
 
 		boolean containsZero = numZeros > 0;
 
@@ -99,9 +114,9 @@
 		return new CompressedSizeInfoColGroup(totalFacts, _compSettings.validCompressions);
 	}
 
-	private static int getNumDistinctValues(ABitmap ubm, int numRows, int[] sampleRows,
+	private static int getNumDistinctValues(ABitmap ubm, int numRows, int sampleSize,
 		HashMap<Integer, Double> solveCache) {
-		return HassAndStokes.haasAndStokes(ubm, numRows, sampleRows.length, solveCache);
+		return HassAndStokes.haasAndStokes(ubm, numRows, sampleSize, solveCache);
 	}
 
 	private static int getNumRuns(ABitmap ubm, int sampleSize, int totalNumRows, int[] sampleRows) {
@@ -269,7 +284,22 @@
 	 * @param smplSize sample size
 	 * @return sorted array of integers
 	 */
-	private static int[] getSortedUniformSample(int range, int smplSize, long seed) {
+	protected static int[] getSortedUniformSample(int range, int smplSize, long seed) {
 		return UtilFunctions.getSortedSampleIndexes(range, smplSize, seed);
 	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append(this.getClass().getSimpleName());
+		sb.append(" sampleSize: ");
+		sb.append(_sampleRows.length);
+		sb.append(" transposed: ");
+		sb.append(_transposed);
+		sb.append(" cols: ");
+		sb.append(_numCols);
+		sb.append(" rows: ");
+		sb.append(_numRows);
+		return sb.toString();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java
index c920772..f3362a1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java
@@ -64,4 +64,13 @@
 		return est;
 	}
 
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("CompressedSizeInfo");
+		sb.append("\n  - CompressableColumns: " + (colsC) + " UncompressableColumns: " + (colsUC));
+		sb.append("\n  - CompressionRatio: " + compRatios);
+		sb.append("\n  - nnzUC: " + nnzUC);
+		return sb.toString();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index cf89e8c..da49753 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -123,4 +123,13 @@
 		}
 		return size;
 	}
+
+	@Override
+	public String toString(){
+		StringBuilder sb = new StringBuilder();
+
+		sb.append(_bestCompressionType);
+		sb.append("\n" + _sizes);
+		return sb.toString();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java b/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
index 0c0d7f9..3ad8224 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
@@ -62,8 +62,9 @@
 
 	protected static EstimationFactors computeSizeEstimationFactors(ABitmap ubm, boolean inclRLE, int numRows,
 		int numCols) {
-		int numVals = ubm.getNumValues();
-		boolean containsZero = ubm.containsZero();
+		
+		int numVals = (ubm != null) ? ubm.getNumValues(): 0;
+		boolean containsZero = (ubm != null) ? ubm.containsZero() : true;
 
 		int numRuns = 0;
 		int numOffs = 0;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/sample/HassAndStokes.java b/src/main/java/org/apache/sysds/runtime/compress/estim/sample/HassAndStokes.java
index b745d4e..d7b0109 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/sample/HassAndStokes.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/sample/HassAndStokes.java
@@ -46,8 +46,7 @@
 	 * @param solveCache A Hashmap containing information for getDuj2aEstimate
 	 * @return An estimation of distinct elements in the population.
 	 */
-	public static int haasAndStokes(ABitmap ubm, int nRows, int sampleSize,
-		HashMap<Integer, Double> solveCache) {
+	public static int haasAndStokes(ABitmap ubm, int nRows, int sampleSize, HashMap<Integer, Double> solveCache) {
 		// obtain value and frequency histograms
 		int numVals = ubm.getNumValues();
 		int[] freqCounts = FrequencyCount.get(ubm);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibBinaryCellOp.java
index 9a77b50..eb71e36 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibBinaryCellOp.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibBinaryCellOp.java
@@ -38,8 +38,10 @@
 import org.apache.sysds.runtime.compress.colgroup.ColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
 import org.apache.sysds.runtime.compress.colgroup.Dictionary;
 import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.functionobjects.Divide;
 import org.apache.sysds.runtime.functionobjects.Minus;
 import org.apache.sysds.runtime.functionobjects.Multiply;
@@ -48,9 +50,7 @@
 import org.apache.sysds.runtime.matrix.data.LibMatrixBincell.BinaryAccessType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
 import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
-import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class LibBinaryCellOp {
@@ -68,11 +68,6 @@
 	 */
 	public static MatrixBlock bincellOp(CompressedMatrixBlock m1, MatrixBlock m2, CompressedMatrixBlock ret,
 		BinaryOperator op) {
-		if(op.fn instanceof Minus) {
-			ScalarOperator sop = new RightScalarOperator(Multiply.getMultiplyFnObject(), -1);
-			m2 = m2.scalarOperations(sop, new MatrixBlock());
-			return LibBinaryCellOp.bincellOp(m1, m2, ret, new BinaryOperator(Plus.getPlusFnObject()));
-		}
 
 		BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(m1, m2);
 
@@ -114,31 +109,55 @@
 		// Apply the operation to each of the column groups.
 		// Most implementations will only modify metadata.
 		List<ColGroup> oldColGroups = m1.getColGroups();
-		List<ColGroup> newColGroups = new ArrayList<>(oldColGroups.size());
 		double[] v = m2.getDenseBlockValues();
+		if(v == null) {
+			SparseBlock sb = m2.getSparseBlock();
+			if(sb == null) {
+				throw new DMLRuntimeException("Unknown matrix block type");
+			}
+			else {
+				// make the row a dense vector...
+				double[] spV = sb.values(0);
+				int[] spI = sb.indexes(0);
+				v = new double[m2.getNumColumns()];
+				for(int i = sb.pos(0); i < sb.size(0); i++) {
+					v[spI[i]] = spV[i];
+				}
+			}
+		}
 		boolean sparseSafe = true;
 		for(double x : v) {
-			if(op.fn.execute(x, 0.0) != 0.0) {
+			if(op.fn.execute(0.0, x) != 0.0) {
 				sparseSafe = false;
 				break;
 			}
 		}
 
-		for(ColGroup grp : oldColGroups) {
-			if(grp instanceof ColGroupUncompressed) {
-				throw new DMLCompressionException("Not supported Binary MV");
-			}
-			else {
-				if(grp.getNumCols() == 1) {
-					ScalarOperator sop = new LeftScalarOperator(op.fn, m2.getValue(0, grp.getColIndices()[0]), 1);
-					newColGroups.add(grp.scalarOperation(sop));
+		List<ColGroup> newColGroups = new ArrayList<>(oldColGroups.size());
+		int k = OptimizerUtils.getConstrainedNumThreads(-1);
+		ExecutorService pool = CommonThreadPool.get(k);
+		ArrayList<BinaryMVRowTask> tasks = new ArrayList<>();
+		try {
+			for(ColGroup grp : oldColGroups) {
+				if(grp instanceof ColGroupUncompressed) {
+					throw new DMLCompressionException("Not supported uncompressed Col Group for Binary MV");
 				}
 				else {
-					ColGroup ncg = grp.binaryRowOp(op, v, sparseSafe);
-					newColGroups.add(ncg);
+					tasks.add(new BinaryMVRowTask(grp, v, sparseSafe, op));
+
 				}
 			}
+
+			for(Future<ColGroup> f : pool.invokeAll(tasks))
+				newColGroups.add(f.get());
+
+			pool.shutdown();
 		}
+		catch(InterruptedException | ExecutionException e) {
+			e.printStackTrace();
+			throw new DMLRuntimeException(e);
+		}
+
 		ret.allocateColGroupList(newColGroups);
 		ret.setNonZeros(m1.getNumColumns() * m1.getNumRows());
 		return ret;
@@ -148,15 +167,27 @@
 	protected static CompressedMatrixBlock binaryMVPlusStack(CompressedMatrixBlock m1, MatrixBlock m2,
 		CompressedMatrixBlock ret, BinaryOperator op) {
 		List<ColGroup> oldColGroups = m1.getColGroups();
-		List<ColGroup> newColGroups = new ArrayList<>(oldColGroups.size() + 1);
+
+		List<ColGroup> newColGroups = (m2.isEmpty()) ? new ArrayList<>(oldColGroups.size()) : new ArrayList<>(
+			oldColGroups.size() + 1);
+		boolean foundConst = false;
 		for(ColGroup grp : m1.getColGroups()) {
-			newColGroups.add(grp);
+			if(!m2.isEmpty() && !foundConst && grp instanceof ColGroupConst) {
+				ADictionary newDict = ((ColGroupValue) grp).applyBinaryRowOp(op.fn, m2.getDenseBlockValues(), false);
+				newColGroups.add(new ColGroupConst(grp.getColIndices(), m1.getNumRows(), newDict));
+				foundConst = true;
+			}
+			else {
+				newColGroups.add(grp);
+			}
 		}
-		int[] colIndexes = oldColGroups.get(0).getColIndices();
-		double[] v = m2.getDenseBlockValues();
-		ADictionary newDict = new Dictionary(new double[colIndexes.length]);
-		newDict = newDict.applyBinaryRowOp(op.fn, v, true, colIndexes);
-		newColGroups.add(new ColGroupConst(colIndexes, m1.getNumRows(), newDict));
+		if(!m2.isEmpty() && !foundConst) {
+			int[] colIndexes = oldColGroups.get(0).getColIndices();
+			double[] v = m2.getDenseBlockValues();
+			ADictionary newDict = new Dictionary(new double[colIndexes.length]);
+			newDict = newDict.applyBinaryRowOp(op.fn, v, true, colIndexes);
+			newColGroups.add(new ColGroupConst(colIndexes, m1.getNumRows(), newDict));
+		}
 		ret.allocateColGroupList(newColGroups);
 		ret.setOverlapping(true);
 		ret.setNonZeros(-1);
@@ -164,8 +195,12 @@
 	}
 
 	public static MatrixBlock binaryMVPlusCol(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op) {
-		MatrixBlock ret = new MatrixBlock(m1.getNumRows(), m1.getNumColumns(), false, -1).allocateBlock();
+		if(m1.getNumRows() != m2.getNumRows())
+			throw new DMLRuntimeException("Invalid number of rows in input. Should be equal for m1 and m2 but are : "
+				+ m1.getNumRows() + " " + m2.getNumRows());
 
+		MatrixBlock ret = new MatrixBlock(m1.getNumRows(), m1.getNumColumns(), false, -1).allocateBlock();
+		// LOG.error(Arrays.toString(m2.getDenseBlockValues()));
 		final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
 		int k = OptimizerUtils.getConstrainedNumThreads(-1);
 		ExecutorService pool = CommonThreadPool.get(k);
@@ -173,16 +208,14 @@
 
 		try {
 			for(int i = 0; i * blkz < m1.getNumRows(); i++) {
-				BinaryMVColTask rt = new BinaryMVColTask(m1.getColGroups(), m2, ret, i * blkz,
-					Math.min(m1.getNumRows(), (i + 1) * blkz), op);
-				tasks.add(rt);
+				tasks.add(new BinaryMVColTask(m1.getColGroups(), m2, ret, i * blkz,
+					Math.min(m1.getNumRows(), (i + 1) * blkz), op));
 			}
-			List<Future<Integer>> futures = pool.invokeAll(tasks);
-			pool.shutdown();
 			long nnz = 0;
-			for(Future<Integer> f : futures)
+			for(Future<Integer> f : pool.invokeAll(tasks))
 				nnz += f.get();
 			ret.setNonZeros(nnz);
+			pool.shutdown();
 		}
 		catch(InterruptedException | ExecutionException e) {
 			e.printStackTrace();
@@ -212,23 +245,47 @@
 
 		@Override
 		public Integer call() {
-
+			List<Integer> columns = new ArrayList<>();
 			for(ColGroup g : _groups) {
-				g.decompressToBlock(_ret, _rl, _ru, _rl, g.getValues());
+				// unsafe decompress, since we count nonzeros afterwards.
+				g.decompressToBlockSafe(_ret, _rl, _ru, _rl, g.getValues(), false);
+				for(int i: g.getColIndices()){
+					columns.add(i);
+				}
 			}
 
 			int nnz = 0;
 			DenseBlock db = _ret.getDenseBlock();
 			for(int row = _rl; row < _ru; row++) {
 				double vr = _m2.quickGetValue(row, 0);
-				for(int col = 0; col < _ret.getNumColumns(); col++) {
+				for(int col : columns) {
 					double v = _op.fn.execute(_ret.quickGetValue(row, col), vr);
 					nnz += (v != 0) ? 1 : 0;
 					db.set(row, col, v);
 				}
+
 			}
 
 			return nnz;
 		}
 	}
+
+	private static class BinaryMVRowTask implements Callable<ColGroup> {
+		private final ColGroup _group;
+		private final double[] _v;
+		private final boolean _sparseSafe;
+		private final BinaryOperator _op;
+
+		protected BinaryMVRowTask(ColGroup group, double[] v, boolean sparseSafe, BinaryOperator op) {
+			_group = group;
+			_v = v;
+			_op = op;
+			_sparseSafe = sparseSafe;
+		}
+
+		@Override
+		public ColGroup call() {
+			return _group.binaryRowOp(_op, _v, _sparseSafe);
+		}
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
index 940a849..316cf41 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
@@ -26,6 +26,8 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressionSettings;
@@ -52,7 +54,7 @@
 
 public class LibCompAgg {
 
-    // private static final Log LOG = LogFactory.getLog(LibCompAgg.class.getName());
+    private static final Log LOG = LogFactory.getLog(LibCompAgg.class.getName());
 
     /** Threshold for when to parallelize the aggregation functions. */
     private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024; // 8MB
@@ -88,18 +90,18 @@
                 ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
                 if(op.indexFn instanceof ReduceCol && grpParts.length > 0) {
                     final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
-                    int blklen = (int) Math.ceil((double) m1.getNumRows() / op.getNumThreads());
+                    int blklen = Math.min((int) Math.ceil((double) m1.getNumRows() / op.getNumThreads()), blkz / 2);
                     blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
                     for(int i = 0; i < op.getNumThreads() & i * blklen < m1.getNumRows(); i++) {
                         tasks.add(new UnaryAggregateTask(grpParts[0], ret, i * blklen,
-                            Math.min((i + 1) * blklen, m1.getNumRows()), op));
+                            Math.min((i + 1) * blklen, m1.getNumRows()), op, m1.getNumColumns()));
 
                     }
                 }
                 else
                     for(ArrayList<ColGroup> grp : grpParts) {
                         if(grp != null)
-                            tasks.add(new UnaryAggregateTask(grp, ret, 0, m1.getNumRows(), op));
+                            tasks.add(new UnaryAggregateTask(grp, ret, 0, m1.getNumRows(), op, m1.getNumColumns()));
                     }
                 List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);
                 pool.shutdown();
@@ -138,24 +140,23 @@
         }
         else {
             if(m1.getColGroups() != null) {
-
                 for(ColGroup grp : m1.getColGroups())
                     if(grp instanceof ColGroupUncompressed)
                         ((ColGroupUncompressed) grp).unaryAggregateOperations(op, ret);
-                aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, m1.getNumRows());
+                aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, m1.getNumRows(), m1.getNumColumns());
             }
         }
 
         // special handling zeros for rowmins/rowmax
-        if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin) {
-            int[] rnnz = new int[m1.getNumRows()];
-            for(ColGroup grp : m1.getColGroups())
-                grp.countNonZerosPerRow(rnnz, 0, m1.getNumRows());
-            Builtin builtin = (Builtin) op.aggOp.increOp.fn;
-            for(int i = 0; i < m1.getNumRows(); i++)
-                if(rnnz[i] < m1.getNumColumns())
-                    ret.quickSetValue(i, 0, builtin.execute(ret.quickGetValue(i, 0), 0));
-        }
+        // if(op.getNumThreads() == 1 && op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin) {
+        // int[] rnnz = new int[m1.getNumRows()];
+        // for(ColGroup grp : m1.getColGroups())
+        // grp.countNonZerosPerRow(rnnz, 0, m1.getNumRows());
+        // Builtin builtin = (Builtin) op.aggOp.increOp.fn;
+        // for(int i = 0; i < m1.getNumRows(); i++)
+        // if(rnnz[i] < m1.getNumColumns())
+        // ret.quickSetValue(i, 0, builtin.execute(ret.quickGetValue(i, 0), 0));
+        // }
 
         // special handling of mean
         if(op.aggOp.increOp.fn instanceof Mean) {
@@ -196,8 +197,8 @@
             // compute all compressed column groups
             ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
             ArrayList<UnaryAggregateOverlappingTask> tasks = new ArrayList<>();
-            final int blklen = Math.min(m1.getNumRows() /op.getNumThreads(), CompressionSettings.BITMAP_BLOCK_SZ) ;
-            // final int blklen = CompressionSettings.BITMAP_BLOCK_SZ ;/// m1.getNumColumns();
+            final int blklen = Math.min(m1.getNumRows() / op.getNumThreads(), CompressionSettings.BITMAP_BLOCK_SZ);
+            LOG.error("BlockSize : " + blklen);
 
             for(int i = 0; i * blklen < m1.getNumRows(); i++) {
                 tasks.add(new UnaryAggregateOverlappingTask(m1.getColGroups(), ret, i * blklen,
@@ -228,17 +229,17 @@
 
                 ret.recomputeNonZeros();
             }
-            else if(op.indexFn instanceof ReduceCol) {
-                long nnz = 0;
-                for(int i = 0; i * blklen < m1.getNumRows(); i++) {
-                    MatrixBlock tmp = rtasks.get(i).get();
-                    for(int row = 0, off = i * blklen; row < tmp.getNumRows(); row++, off++) {
-                        ret.quickSetValue(off, 0, tmp.quickGetValue(row, 0));
-                        nnz += ret.quickGetValue(off, 0) == 0 ? 0 : 1;
-                    }
-                }
-                ret.setNonZeros(nnz);
-            }
+            // else if(op.indexFn instanceof ReduceCol) {
+            // long nnz = 0;
+            // for(int i = 0; i * blklen < m1.getNumRows(); i++) {
+            // MatrixBlock tmp = rtasks.get(i).get();
+            // for(int row = 0, off = i * blklen; row < tmp.getNumRows(); row++, off++) {
+            // ret.quickSetValue(off, 0, tmp.quickGetValue(row, 0));
+            // nnz += ret.quickGetValue(off, 0) == 0 ? 0 : 1;
+            // }
+            // }
+            // ret.setNonZeros(nnz);
+            // }
             else {
                 for(Future<MatrixBlock> rtask : rtasks) {
                     LibMatrixBincell.bincellOp(rtask.get(),
@@ -285,13 +286,33 @@
     }
 
     private static void aggregateUnaryOperations(AggregateUnaryOperator op, List<ColGroup> groups, MatrixBlock ret,
-        int rl, int ru) {
+        int rl, int ru, int numColumns) {
 
         // note: UC group never passed into this function
-        double[] c = ret.getDenseBlockValues();
-        for(ColGroup grp : groups)
-            if(grp != null && !(grp instanceof ColGroupUncompressed))
-                grp.unaryAggregateOperations(op, c, rl, ru);
+        // double[] c = ret.getDenseBlockValues();
+        int[] rnnz = (op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin) ? new int[ru -
+            rl] : null;
+        int numberDenseColumns = 0;
+        for(ColGroup grp : groups){
+            if(grp != null && !(grp instanceof ColGroupUncompressed)) {
+                grp.unaryAggregateOperations(op, ret, rl, ru);
+                if(grp.isDense()){
+                    numberDenseColumns += grp.getNumCols();
+                }
+                else if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin) {
+                    grp.countNonZerosPerRow(rnnz, rl, ru);
+                }
+            }
+        }
+
+        if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin) {
+            for(int row = rl; row < ru; row++) {
+                if(rnnz[row] + numberDenseColumns < numColumns) {
+                    ret.quickSetValue(row, 0, op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
+                }
+            }
+
+        }
 
     }
 
@@ -300,14 +321,16 @@
         private final int _rl;
         private final int _ru;
         private final MatrixBlock _ret;
+        private final int _numColumns;
         private final AggregateUnaryOperator _op;
 
-        protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, int rl, int ru,
-            AggregateUnaryOperator op) {
+        protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op,
+            int numColumns) {
             _groups = groups;
             _op = op;
             _rl = rl;
             _ru = ru;
+            _numColumns = numColumns;
 
             if(_op.indexFn instanceof ReduceAll) { // sum
                 _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
@@ -326,7 +349,7 @@
 
         @Override
         public MatrixBlock call() {
-            aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru);
+            aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru, _numColumns);
             return _ret;
         }
     }
@@ -349,7 +372,7 @@
                 _ret.allocateDenseBlock();
             }
             else if(_op.indexFn instanceof ReduceCol) {
-                _ret = new MatrixBlock(ru - rl, ret.getNumColumns(), false);
+                _ret = new MatrixBlock(ru - rl, 1, false);
                 _ret.allocateDenseBlock();
             }
             else {
@@ -376,7 +399,7 @@
             }
 
             for(ColGroup g : _groups) {
-                g.decompressToBlock(tmp, _rl, _ru, 0, g.getValues());
+                g.decompressToBlockSafe(tmp, _rl, _ru, 0, g.getValues(), false);
             }
 
             LibMatrixAgg.aggregateUnaryMatrix(tmp, _ret, _op);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
index 258f556..334e812 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
@@ -40,9 +40,10 @@
 import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
-import org.apache.sysds.runtime.data.SparseRow;
+import org.apache.sysds.runtime.functionobjects.SwapIndex;
 import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class LibLeftMultBy {
@@ -56,44 +57,53 @@
 	};
 
 	public static MatrixBlock leftMultByMatrix(List<ColGroup> groups, MatrixBlock that, MatrixBlock ret,
-		boolean doTranspose, boolean allocTmp, int rl, int cl, boolean overlapping, int k, Pair<Integer, int[]> v) {
-
+		boolean doTranspose, boolean allocTmp, int numCols, boolean overlapping, int k, Pair<Integer, int[]> v) {
+		int numRowsOutput = doTranspose ? that.getNumColumns() : that.getNumRows();
 		if(ret == null)
-			ret = new MatrixBlock(rl, cl, false, rl * cl);
-		else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated()))
-			ret.reset(rl, cl, false, rl * cl);
-		if(that instanceof CompressedMatrixBlock) {
-			LOG.info("Decompression Left side Matrix (Should not really happen)");
-		}
-		that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that;
+			ret = new MatrixBlock(numRowsOutput, numCols, false, numRowsOutput * numCols);
+		else if(!(ret.getNumColumns() == numCols && ret.getNumRows() == numRowsOutput && ret.isAllocated()))
+			ret.reset(numRowsOutput, numCols, false, numRowsOutput * numCols);
 
-		// if(that.getNumRows() == 1) {
-		// if(k > 1) {
-		// return leftMultByVectorTranspose(groups, that, ret, doTranspose, k, v, overlapping);
-		// }
-		// else {
-		// return leftMultByVectorTranspose(groups, that, ret, doTranspose, true, v, overlapping);
-		// }
-		// }
-		// else {
-		return leftMultByMatrix(groups, that, ret, k, cl, v, overlapping);
-		// }
+		if(that instanceof CompressedMatrixBlock) {
+			if(doTranspose) {
+				return leftMultByCompressedTransposedMatrix(groups,
+					(CompressedMatrixBlock) that,
+					ret,
+					k,
+					numCols,
+					v,
+					overlapping);
+			}
+			else {
+				LOG.error("Decompression Left side Matrix (Should not really happen)");
+				that = ((CompressedMatrixBlock) that).decompress(k);
+			}
+		}
+		else if(doTranspose) {
+			ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), k);
+			that = that.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+		}
+
+		return leftMultByMatrix(groups, that, ret, k, numCols, v, overlapping);
+
 	}
 
-	public static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu, int k,
-		int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
-		if(k <= 1 || overlapping) {
-			leftMultByTransposeSelf(groups, result, gl, gu, v, overlapping);
+	public static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int k, int numColumns,
+		Pair<Integer, int[]> v, boolean overlapping) {
+
+		if(k <= 1) {
+			int cl = 0;
+			int cu = numColumns;
+			leftMultByTransposeSelfOverlapping(groups, result, v, cl, cu, overlapping);
 		}
 		else {
 			try {
 				ExecutorService pool = CommonThreadPool.get(k);
-				ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<>();
-				int numgrp = groups.size();
-				int blklen = (int) (Math.ceil((double) numgrp / (2 * k)));
-				for(int i = 0; i < 2 * k & i * blklen < numColumns; i++)
-					tasks.add(new MatrixMultTransposeTask(groups, result, i * blklen,
-						Math.min((i + 1) * blklen, numgrp), v, overlapping));
+				ArrayList<MatrixMultTransposeTaskOverlapping> tasks = new ArrayList<>();
+				int blklen = (int) (Math.ceil((double) numColumns / k));
+				for(int i = 0; i * blklen < numColumns; i++)
+					tasks.add(new MatrixMultTransposeTaskOverlapping(groups, result, i * blklen,
+						Math.min((i + 1) * blklen, numColumns), v, overlapping));
 				List<Future<Object>> ret = pool.invokeAll(tasks);
 				for(Future<Object> tret : ret)
 					tret.get(); // check for errors
@@ -105,6 +115,54 @@
 		}
 	}
 
+	public static MatrixBlock leftMultByCompressedTransposedMatrix(List<ColGroup> colGroups, CompressedMatrixBlock that,
+		MatrixBlock ret, int k, int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
+
+		if(ret == null)
+			ret = new MatrixBlock(that.getNumColumns(), numColumns, true, -1);
+		else
+			ret.reset(that.getNumColumns(), numColumns, true, -1);
+		ret.allocateDenseBlock();
+		Pair<Integer, int[]> thatV = that.getMaxNumValues();
+		LOG.error("Left mult by compressed transposed matrix: threads" + k);
+		if(k <= 1) {
+			int rl = 0;
+			int ru = that.getNumColumns();
+			leftMultByTranspose(colGroups, that.getColGroups(), ret, v, thatV, rl, ru, overlapping, 1);
+		}
+		else {
+			try {
+				ExecutorService pool = CommonThreadPool.get(k);
+				ArrayList<leftMultByCompressedTransposedMatrixTask> tasks = new ArrayList<>();
+				int blklen = (int) (Math.ceil((double) that.getNumColumns() / k));
+				int numBlocks = that.getNumColumns() / blklen;
+				int numExtraThreads = k / numBlocks;
+				LOG.error("overlapping  : " + overlapping);
+				// if(!overlapping) {
+				// for(int i = 0; i < that.getNumColumns(); i++) {
+				// tasks.add(new leftMultByCompressedTransposedMatrixTask(colGroups, that.getColGroups(), ret, v,
+				// thatV, i * blklen, Math.min((i + 1) * blklen, that.getNumColumns()), overlapping, ));
+				// }
+				// }
+				// else {
+				for(int i = 0; i * blklen < that.getNumColumns(); i++)
+					tasks.add(new leftMultByCompressedTransposedMatrixTask(colGroups, that.getColGroups(), ret, v,
+						thatV, i * blklen, Math.min((i + 1) * blklen, that.getNumColumns()), overlapping,
+						numExtraThreads));
+				// }
+				List<Future<Object>> futures = pool.invokeAll(tasks);
+				LOG.error("tasks: " + futures.size() + "  Each task has threads: " + numExtraThreads);
+				for(Future<Object> tret : futures)
+					tret.get(); // check for errors
+				pool.shutdown();
+			}
+			catch(InterruptedException | ExecutionException e) {
+				throw new DMLRuntimeException(e);
+			}
+		}
+		return ret;
+	}
+
 	private static MatrixBlock leftMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
 		int numColumns, Pair<Integer, int[]> v, boolean overlapping) {
 		ret.allocateDenseBlock();
@@ -140,6 +198,8 @@
 
 			if(k == 1) {
 				// Pair<Integer, int[]> v = getMaxNumValues(colGroups);
+
+				ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
 				for(int j = 0; j < colGroups.size(); j++) {
 					colGroups.get(j).leftMultByMatrix(thatV,
 						retV,
@@ -150,6 +210,7 @@
 						ret.getNumRows(),
 						0);
 				}
+				ColGroupValue.cleanupThreadLocalMemory();
 			}
 			else {
 				try {
@@ -200,6 +261,7 @@
 					result.getDenseBlockValues(),
 					v.getRight()[i]);
 			}
+			ColGroupValue.cleanupThreadLocalMemory();
 		}
 		else {
 
@@ -211,8 +273,7 @@
 		// delegate matrix-vector operation to each column group
 
 		// post-processing
-		if(allocTmp)
-			ColGroupValue.cleanupThreadLocalMemory();
+		// if(allocTmp)
 		result.recomputeNonZeros();
 
 		return result;
@@ -288,41 +349,43 @@
 			}
 		}
 		if(k == 1) {
-			double[] materializedRow = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
+			double[] tmpA = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
 
-			for(int r = 0; r < that.getNumRows(); r++) {
-				SparseRow row = sb.get(r);
-				if(row != null) {
-
-					for(int j = 0; j < colGroups.size(); j++) {
-						colGroups.get(j).leftMultBySparseMatrix(row.size(),
-							row.indexes(),
-							row.values(),
+			ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
+			for(int j = 0; j < colGroups.size(); j++) {
+				for(int r = 0; r < that.getNumRows(); r++) {
+					if(!sb.isEmpty(r)) {
+						colGroups.get(j).leftMultBySparseMatrix(sb,
 							ret.getDenseBlockValues(),
-							v.getRight()[j],
 							materialized[j],
 							that.getNumRows(),
-							ret.getNumColumns(),
+							numColumns,
 							r,
-							materializedRow);
+							tmpA);
 					}
 				}
 			}
+			ColGroupValue.cleanupThreadLocalMemory();
 		}
 		else {
 			ExecutorService pool = CommonThreadPool.get(k);
 			ArrayList<LeftMatrixSparseMatrixMultTask> tasks = new ArrayList<>();
 			try {
-
-				for(int r = 0; r < that.getNumRows(); r++) {
+				// long thatnnz = that.getNonZeros();
+				// int rowBlockSize = that.getNumRows() / k;
+				int rowBlockSize = (int) Math.ceil(1000.0 / (that.getNonZeros() / that.getNumRows()));
+				// rowBlockSize = 1;
+				for(int r = 0; r * rowBlockSize < that.getNumRows(); r++) {
 					if(overlapping) {
 						tasks.add(new LeftMatrixSparseMatrixMultTask(colGroups, materialized, sb,
-							ret.getDenseBlockValues(), that.getNumRows(), numColumns, v, r, r + 1));
+							ret.getDenseBlockValues(), that.getNumRows(), numColumns, v, r * rowBlockSize,
+							Math.min((r + 1) * rowBlockSize, that.getNumRows())));
 					}
 					else {
 						for(int i = 0; i < colGroups.size(); i++) {
 							tasks.add(new LeftMatrixSparseMatrixMultTask(colGroups.get(i), materialized, i, sb,
-								ret.getDenseBlockValues(), that.getNumRows(), numColumns, v, r, r + 1));
+								ret.getDenseBlockValues(), that.getNumRows(), numColumns, v, r * rowBlockSize,
+								Math.min((r + 1) * rowBlockSize, that.getNumRows())));
 						}
 					}
 				}
@@ -342,8 +405,69 @@
 
 	}
 
-	private static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu,
-		Pair<Integer, int[]> v, boolean overlapping) {
+	// private static void leftMultByTransposeSelfNonOverlapping(List<ColGroup> groups, MatrixBlock result,
+	// Pair<Integer, int[]> v, int gl, int gu) {
+
+	// // TODO exploit potential multiplcation in compressed format.
+
+	// final int numRows = groups.get(0).getNumRows();
+
+	// // preallocated dense tmp matrix blocks
+	// MatrixBlock lhs = new MatrixBlock(1, numRows, false);
+	// MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(), false);
+	// lhs.allocateDenseBlock();
+	// tmpret.allocateDenseBlock();
+
+	// // setup memory pool for reuse
+	// ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
+
+	// // approach: for each colgroup, extract uncompressed columns one at-a-time
+	// // vector-matrix multiplies against remaining col groups
+	// for(int i = gl; i < gu; i++) {
+	// // get current group and relevant col groups
+	// ColGroup group = groups.get(i);
+	// int[] ixgroup = group.getColIndices();
+	// List<ColGroup> tmpList = groups.subList(i, groups.size());
+
+	// // if(group instanceof ColGroupDDC // single DDC group
+	// // && ixgroup.length == 1 && !containsUC && numRows < CompressionSettings.BITMAP_BLOCK_SZ) {
+	// // // compute vector-matrix partial result
+	// // leftMultByVectorTranspose(tmpList, (ColGroupDDC) group, tmpret);
+
+	// // // write partial results (disjoint non-zeros)
+	// // LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]);
+	// // }
+	// // else {
+	// // for all uncompressed lhs columns vectors
+	// for(int j = 0; j < result.getNumColumns(); j++) {
+	// ColGroup.decompressToBlock(lhs, j, groups);
+
+	// if(!lhs.isEmptyBlock(false)) {
+	// // tmpret.reset();
+	// // compute vector-matrix partial result
+	// // leftMultByMatrix(groups,lhs, tmpret, false, true, 0, 0, overlapping, 1, v );
+	// leftMultByVectorTranspose(groups, lhs, tmpret, false, true, v, overlapping);
+	// // LOG.error(tmpret);
+
+	// // write partial results (disjoint non-zeros)
+	// LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, j);
+	// }
+	// lhs.reset();
+	// // }
+	// }
+	// }
+
+	// // post processing
+	// ColGroupValue.cleanupThreadLocalMemory();
+	// }
+
+	private static void leftMultByTransposeSelfOverlapping(List<ColGroup> groups, MatrixBlock result,
+		Pair<Integer, int[]> v, int cl, int cu, boolean overlapping) {
+		// It should be possible to get better performance exploiting if the matrix is not overlapping.
+		// TODO: exploit specfic column groups (DDC most likely) to gain better performance.
+		// Idea multiplying with one self simply use count of values, and then
+		// calculate : count * v^2
+
 		final int numRows = groups.get(0).getNumRows();
 
 		// preallocated dense tmp matrix blocks
@@ -355,46 +479,71 @@
 		// setup memory pool for reuse
 		ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
 
-		// approach: for each colgroup, extract uncompressed columns one at-a-time
-		// vector-matrix multiplies against remaining col groups
-		// for(int i = gl; i < gu; i++) {
-		// get current group and relevant col groups
-		// ColGroup group = groups.get(i);
-		// int[] ixgroup = group.getColIndices();
-		// List<ColGroup> tmpList = groups.subList(i, numGroups);
-
-		// if(group instanceof ColGroupDDC // single DDC group
-		// && ixgroup.length == 1 && !containsUC && numRows < CompressionSettings.BITMAP_BLOCK_SZ) {
-		// // compute vector-matrix partial result
-		// leftMultByVectorTranspose(tmpList, (ColGroupDDC) group, tmpret);
-
-		// // write partial results (disjoint non-zeros)
-		// LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]);
-		// }
-		// else {
-		// for all uncompressed lhs columns vectors
-		for(int j = 0; j < result.getNumColumns(); j++) {
+		for(int j = cl; j < cu; j++) {
 			ColGroup.decompressToBlock(lhs, j, groups);
-
 			if(!lhs.isEmptyBlock(false)) {
-				// tmpret.reset();
-				// compute vector-matrix partial result
-				// leftMultByMatrix(groups,lhs, tmpret, false, true, 0, 0, overlapping, 1, v );
 				leftMultByVectorTranspose(groups, lhs, tmpret, false, true, v, overlapping);
-				// LOG.error(tmpret);
-
-				// write partial results (disjoint non-zeros)
 				LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, j);
 			}
 			lhs.reset();
-			// }
-			// }
 		}
 
 		// post processing
 		ColGroupValue.cleanupThreadLocalMemory();
 	}
 
+	private static void leftMultByTranspose(List<ColGroup> thisGroups, List<ColGroup> thatGroups, MatrixBlock result,
+		Pair<Integer, int[]> v, Pair<Integer, int[]> thatV, int rl, int ru, boolean overlapping, int k) {
+
+		final int numRows = thisGroups.get(0).getNumRows();
+
+		// preallocated dense tmp matrix blocks
+		MatrixBlock lhs = new MatrixBlock(1, numRows, false);
+		MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(), false);
+		lhs.allocateDenseBlock();
+		tmpret.allocateDenseBlock();
+		if(k > 1)
+			ColGroupValue.setupThreadLocalMemory(Math.max(v.getLeft(), thatV.getLeft()) + 1);
+
+		ExecutorService pool = (k > 1) ? CommonThreadPool.get(k) : null;
+		ArrayList<leftMultByVectorTransposeTask> tasks = (k > 1) ? new ArrayList<>() : null;
+		for(int j = rl; j < ru; j++) {
+			ColGroup.decompressToBlock(lhs, j, thatGroups);
+			if(!lhs.isEmptyBlock(false)) {
+				if(!overlapping && k > 1) {
+					try {
+						int groupBatch = thisGroups.size() / k;
+
+						for(int i = 0; i * groupBatch < thisGroups.size(); i++) {
+							tasks.add(new leftMultByVectorTransposeTask(thisGroups, lhs, tmpret, i * groupBatch,
+								Math.min(thisGroups.size(), (i + 1) * groupBatch), v));
+						}
+						List<Future<Object>> futures = pool.invokeAll(tasks);
+						pool.shutdown();
+						for(Future<Object> future : futures)
+							future.get();
+					}
+					catch(InterruptedException | ExecutionException e) {
+						throw new DMLRuntimeException(e);
+					}
+				}
+				else {
+					for(ColGroup grp : thisGroups) {
+						grp.leftMultByRowVector(lhs.getDenseBlockValues(), tmpret.getDenseBlockValues(), -1);
+					}
+				}
+				for(int i = 0; i < tmpret.getNumColumns(); i++) {
+					result.appendValue(j, i, tmpret.quickGetValue(0, i));
+				}
+			}
+			lhs.reset();
+		}
+
+		// post processing
+		ColGroupValue.cleanupThreadLocalMemory();
+
+	}
+
 	private static class LeftMatrixVectorMultTask implements Callable<Object> {
 		private final List<ColGroup> _groups;
 		private final MatrixBlock _vect;
@@ -524,46 +673,39 @@
 			// Temporary Array to store 2 * block size in
 			double[] tmpA = memPoolOLE.get();
 			if(tmpA == null) {
-				tmpA = new double[CompressionSettings.BITMAP_BLOCK_SZ * 2];
+				if(_groups != null) {
+					tmpA = new double[Math.min(CompressionSettings.BITMAP_BLOCK_SZ * 2, _groups.get(0).getNumRows())];
+				}
+				else {
+					tmpA = new double[Math.min(CompressionSettings.BITMAP_BLOCK_SZ * 2, _group.getNumRows())];
+				}
 			}
 			else {
 				Arrays.fill(tmpA, 0.0);
 			}
 
-			ColGroupValue.setupThreadLocalMemory(_v.getLeft());
+			ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1);
 			try {
 				if(_groups != null) {
 					for(int j = 0; j < _groups.size(); j++) {
 						double[] materializedV = _materialized[j];
 						for(int r = _rl; r < _ru; r++) {
-							if(_that.get(r) != null) {
-								_groups.get(j).leftMultBySparseMatrix(_that.get(r).size(),
-									_that.get(r).indexes(),
-									_that.get(r).values(),
-									_ret,
-									_v.getRight()[j],
-									materializedV,
-									_numRows,
-									_numCols,
-									r,
-									tmpA);
+							if(!_that.isEmpty(r)) {
+								// LOG.error(_that.get(r));
+								// _v.getRight()[j],
+								_groups.get(j)
+									.leftMultBySparseMatrix(_that, _ret, materializedV, _numRows, _numCols, r, tmpA);
+								// Arrays.fill(tmpA, 0.0);
 							}
 						}
 					}
 				}
 				else if(_group != null) {
 					for(int r = _rl; r < _ru; r++) {
-						if(_that.get(r) != null) {
-							_group.leftMultBySparseMatrix(_that.get(r).size(),
-								_that.get(r).indexes(),
-								_that.get(r).values(),
-								_ret,
-								_v.getRight()[0],
-								_materialized[_i],
-								_numRows,
-								_numCols,
-								r,
-								tmpA);
+						if(!_that.isEmpty(r)) {
+							// _v.getRight()[0],
+							_group.leftMultBySparseMatrix(_that, _ret, _materialized[_i], _numRows, _numCols, r, tmpA);
+							// Arrays.fill(tmpA, 0.0);
 						}
 					}
 				}
@@ -577,7 +719,30 @@
 		}
 	}
 
-	private static class MatrixMultTransposeTask implements Callable<Object> {
+	// private static class MatrixMultTransposeTaskNonOverlapping implements Callable<Object> {
+	// private final List<ColGroup> _groups;
+	// private final MatrixBlock _ret;
+	// private final int _gl;
+	// private final int _gu;
+	// private final Pair<Integer, int[]> _v;
+
+	// protected MatrixMultTransposeTaskNonOverlapping(List<ColGroup> groups, MatrixBlock ret, int gl, int gu,
+	// Pair<Integer, int[]> v, boolean overlapping) {
+	// _groups = groups;
+	// _ret = ret;
+	// _gl = gl;
+	// _gu = gu;
+	// _v = v;
+	// }
+
+	// @Override
+	// public Object call() {
+	// leftMultByTransposeSelfNonOverlapping(_groups, _ret, _v, _gl, _gu);
+	// return null;
+	// }
+	// }
+
+	private static class MatrixMultTransposeTaskOverlapping implements Callable<Object> {
 		private final List<ColGroup> _groups;
 		private final MatrixBlock _ret;
 		private final int _gl;
@@ -585,7 +750,7 @@
 		private final Pair<Integer, int[]> _v;
 		private final boolean _overlapping;
 
-		protected MatrixMultTransposeTask(List<ColGroup> groups, MatrixBlock ret, int gl, int gu,
+		protected MatrixMultTransposeTaskOverlapping(List<ColGroup> groups, MatrixBlock ret, int gl, int gu,
 			Pair<Integer, int[]> v, boolean overlapping) {
 			_groups = groups;
 			_ret = ret;
@@ -597,7 +762,68 @@
 
 		@Override
 		public Object call() {
-			leftMultByTransposeSelf(_groups, _ret, _gl, _gu, _v, _overlapping);
+			leftMultByTransposeSelfOverlapping(_groups, _ret, _v, _gl, _gu, _overlapping);
+			return null;
+		}
+	}
+
+	private static class leftMultByCompressedTransposedMatrixTask implements Callable<Object> {
+		private final List<ColGroup> _groups;
+		private final List<ColGroup> _thatGroups;
+		private final MatrixBlock _ret;
+		private final int _rl;
+		private final int _ru;
+		private final Pair<Integer, int[]> _v;
+		private final Pair<Integer, int[]> _thatV;
+		private final boolean _overlapping;
+		private final int _extraThreads;
+
+		protected leftMultByCompressedTransposedMatrixTask(List<ColGroup> thisGroups, List<ColGroup> thatGroups,
+			MatrixBlock ret, Pair<Integer, int[]> v, Pair<Integer, int[]> thatV, int rl, int ru, boolean overlapping,
+			int extraThreads) {
+			_groups = thisGroups;
+			_thatGroups = thatGroups;
+			_ret = ret;
+			_rl = rl;
+			_ru = ru;
+			_v = v;
+			_thatV = thatV;
+			_overlapping = overlapping;
+			_extraThreads = extraThreads;
+		}
+
+		@Override
+		public Object call() {
+			leftMultByTranspose(_groups, _thatGroups, _ret, _v, _thatV, _rl, _ru, _overlapping, _extraThreads);
+			return null;
+		}
+	}
+
+	private static class leftMultByVectorTransposeTask implements Callable<Object> {
+		private final List<ColGroup> _grps;
+		private final MatrixBlock _rowVector;
+		private final MatrixBlock _result;
+		private final int _gl;
+		private final int _gu;
+		private final Pair<Integer, int[]> _v;
+
+		protected leftMultByVectorTransposeTask(List<ColGroup> grps, MatrixBlock rowVector, MatrixBlock result, int gl,
+			int gu, Pair<Integer, int[]> v) {
+			_grps = grps;
+			_rowVector = rowVector;
+			_result = result;
+			_gl = gl;
+			_gu = gu;
+			_v = v;
+		}
+
+		@Override
+		public Object call() {
+			ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1);
+			for(int i = _gl; i < _gu; i++) {
+				_grps.get(i).leftMultByRowVector(_rowVector.getDenseBlockValues(), _result.getDenseBlockValues(), -1);
+			}
+			ColGroupValue.cleanupThreadLocalMemory();
 			return null;
 		}
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
index a19a3c0..206c522 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRelationalOp.java
@@ -199,7 +199,7 @@
             res.setNonZeros(nnz);
         }
         else {
-            final int blkz = CompressionSettings.BITMAP_BLOCK_SZ / cols;
+            final int blkz = CompressionSettings.BITMAP_BLOCK_SZ / 2;
             ExecutorService pool = CommonThreadPool.get(k);
             ArrayList<RelationalTask> tasks = new ArrayList<>();
 
@@ -287,7 +287,12 @@
             }
 
             for(MinMaxGroup mmg : _minMax) {
-                mmg.g.decompressToBlock(tmp, _i * _blkz, Math.min((_i + 1) * _blkz, mmg.g.getNumRows()), 0, mmg.values);
+                mmg.g.decompressToBlockSafe(tmp,
+                    _i * _blkz,
+                    Math.min((_i + 1) * _blkz, mmg.g.getNumRows()),
+                    0,
+                    mmg.values,
+                    false);
             }
 
             for(int row = 0, off = _i * _blkz; row < _blkz && row < _rows - _i * _blkz; row++, off++) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
index b74cbc2..847a53a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
@@ -49,21 +49,21 @@
 	 * Right multiply by matrix. Meaning a left hand side compressed matrix is multiplied with a right hand side
 	 * uncompressed matrix.
 	 * 
-	 * @param colGroups	All Column groups in the compression
-	 * @param that		 The right hand side matrix
-	 * @param ret		  The MatrixBlock to return.
-	 * @param k			The parallelization degree to use.
-	 * @param v			The Precalculated counts and Maximum number of tuple entries in the column groups.
+	 * @param colGroups    All Column groups in the compression
+	 * @param that         The right hand side matrix
+	 * @param ret          The MatrixBlock to return.
+	 * @param k            The parallelization degree to use.
+	 * @param v            The Precalculated counts and Maximum number of tuple entries in the column groups.
 	 * @param allowOverlap Allow the multiplication to return an overlapped matrix.
 	 * @return The Result Matrix, modified from the ret parameter.
 	 */
 	public static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
 		Pair<Integer, int[]> v, boolean allowOverlap) {
 
-		if(that instanceof CompressedMatrixBlock){
+		if(that instanceof CompressedMatrixBlock) {
 			LOG.info("Decompression Right matrix");
 		}
-		that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that;
+		that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress(k) : that;
 
 		boolean containsUncompressable = false;
 		int distinctCount = 0;
@@ -77,7 +77,8 @@
 		}
 		int rl = colGroups.get(0).getNumRows();
 		int cl = that.getNumColumns();
-		if(!allowOverlap || (containsUncompressable || distinctCount >= rl )) {
+		if(!allowOverlap || (containsUncompressable || distinctCount >= rl / 4)) {
+			LOG.info("outputting non overlapping matrix right mm");
 			if(ret == null)
 				ret = new MatrixBlock(rl, cl, false, rl * cl);
 			else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated()))
@@ -88,14 +89,13 @@
 			}
 			else {
 				ret = rightMultByDenseMatrix(colGroups, that, ret, k, v);
-
 			}
 			ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
 		}
 		else {
+			LOG.debug("Outputting Overlapping Matrix");
 			// Create an overlapping compressed Matrix Block.
 			ret = new CompressedMatrixBlock(true);
-
 			ret.setNumColumns(cl);
 			ret.setNumRows(rl);
 			CompressedMatrixBlock retC = (CompressedMatrixBlock) ret;
@@ -116,10 +116,10 @@
 	 * Multi-threaded version of rightMultByVector.
 	 * 
 	 * @param colGroups The Column groups used int the multiplication
-	 * @param vector	matrix block vector to multiply with
-	 * @param result	matrix block result to modify in the multiplication
-	 * @param k		 number of threads to use
-	 * @param v		 The Precalculated counts and Maximum number of tuple entries in the column groups
+	 * @param vector    matrix block vector to multiply with
+	 * @param result    matrix block result to modify in the multiplication
+	 * @param k         number of threads to use
+	 * @param v         The Precalculated counts and Maximum number of tuple entries in the column groups
 	 */
 	public static void rightMultByVector(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, int k,
 		Pair<Integer, int[]> v) {
@@ -171,7 +171,7 @@
 	 * 
 	 * @param vector right-hand operand of the multiplication
 	 * @param result buffer to hold the result; must have the appropriate size already
-	 * @param v	  The Precalculated counts and Maximum number of tuple entries in the column groups.
+	 * @param v      The Precalculated counts and Maximum number of tuple entries in the column groups.
 	 */
 	private static void rightMultByVector(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result,
 		Pair<Integer, int[]> v) {
@@ -253,12 +253,12 @@
 		}
 
 		if(k == 1) {
-			ColGroupValue.setupThreadLocalMemory((v.getLeft()));
+			int colBlockSize = 128;
+			ColGroupValue.setupThreadLocalMemory(colBlockSize * v.getLeft());
 			for(int b = 0; b < db.numBlocks(); b++) {
 				// int blockSize = db.blockSize(b);
 				thatV = db.valuesAt(b);
 				for(int j = 0; j < colGroups.size(); j++) {
-					int colBlockSize = 128;
 					for(int i = 0; i < that.getNumColumns(); i += colBlockSize) {
 						if(colGroups.get(j) instanceof ColGroupValue) {
 							double[] preAggregatedB = ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j],
@@ -284,7 +284,6 @@
 			ColGroupValue.cleanupThreadLocalMemory();
 		}
 		else {
-
 			thatV = db.valuesAt(0);
 			ExecutorService pool = CommonThreadPool.get(k);
 			ArrayList<RightMatrixMultTask> tasks = new ArrayList<>();
@@ -299,7 +298,7 @@
 				for(int j = 0; j * blklenRows < ret.getNumRows(); j++) {
 					RightMatrixMultTask rmmt = new RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(),
 						j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(),
-						false, false);
+						false);
 					tasks.add(rmmt);
 				}
 				blklenRows += (blklenRows % blkz != 0) ? blkz - blklenRows % blkz : 0;
@@ -307,7 +306,7 @@
 				for(int j = 0; j * blklenRows < ret.getNumRows(); j++) {
 					RightMatrixMultTask rmmt = new RightMatrixMultTask(colGroups, retV, ag, v, that.getNumColumns(),
 						j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(),
-						false, true);
+						true);
 					tasks.add(rmmt);
 				}
 				for(Future<Object> future : pool.invokeAll(tasks))
@@ -381,6 +380,10 @@
 		CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) {
 
 		SparseBlock sb = that.getSparseBlock();
+		if(sb == null) {
+			throw new DMLRuntimeException(
+				"right Mult By sparse Matrix compressed should only be called with an sparse input");
+		}
 
 		for(ColGroup grp : colGroups) {
 			if(grp instanceof ColGroupUncompressed) {
@@ -397,7 +400,7 @@
 		if(k == 1) {
 			for(int j = 0; j < colGroups.size(); j++) {
 				ColGroupValue g = (ColGroupValue) colGroups.get(j);
-				double[] preAggregatedB = g.preaggValues(v.getRight()[j],
+				double[] preAggregatedB = g.preaggValues(v.getRight()[j] / g.getNumCols(),
 					sb,
 					colGroups.get(j).getValues(),
 					0,
@@ -444,8 +447,8 @@
 		preTask.clear();
 		for(int h = 0; h < colGroups.size(); h++) {
 			RightMatrixPreAggregateSparseTask pAggT = new RightMatrixPreAggregateSparseTask(
-				(ColGroupValue) colGroups.get(h), v.getRight()[h], sb, colGroups.get(h).getValues(), 0,
-				that.getNumColumns(), that.getNumColumns());
+				(ColGroupValue) colGroups.get(h), v.getRight()[h] / colGroups.get(h).getNumCols(), sb,
+				colGroups.get(h).getValues(), 0, that.getNumColumns(), that.getNumColumns());
 			preTask.add(pAggT);
 		}
 		return preTask;
@@ -501,11 +504,10 @@
 		private final int _ru;
 		private final int _cl;
 		private final int _cu;
-		private final boolean _mem;
 		private final boolean _skipOle;
 
 		protected RightMatrixMultTask(List<ColGroup> groups, double[] retV, List<Future<double[]>> aggB,
-			Pair<Integer, int[]> v, int numColumns, int rl, int ru, int cl, int cu, boolean mem, boolean skipOle) {
+			Pair<Integer, int[]> v, int numColumns, int rl, int ru, int cl, int cu, boolean skipOle) {
 			_colGroups = groups;
 			// _thatV = thatV;
 			_retV = retV;
@@ -516,15 +518,13 @@
 			_ru = ru;
 			_cl = cl;
 			_cu = cu;
-			_mem = mem;
 			_skipOle = skipOle;
 		}
 
 		@Override
 		public Object call() {
 			try {
-				if(_mem)
-					ColGroupValue.setupThreadLocalMemory((_v.getLeft()));
+				ColGroupValue.setupThreadLocalMemory((_v.getLeft()));
 				for(int j = 0; j < _colGroups.size(); j++) {
 					if(_colGroups.get(j) instanceof ColGroupOLE) {
 						if(_skipOle) {
@@ -539,8 +539,7 @@
 						}
 					}
 				}
-				if(_mem)
-					ColGroupValue.cleanupThreadLocalMemory();
+				ColGroupValue.cleanupThreadLocalMemory();
 				return null;
 			}
 			catch(Exception e) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
index b67b6ef..1aeac1e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
@@ -27,11 +27,13 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.colgroup.ColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupOLE;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
 import org.apache.sysds.runtime.compress.colgroup.Dictionary;
@@ -62,7 +64,7 @@
 					ret,
 					overlapping);
 			}
-			else if(sop.fn instanceof Divide){
+			else if(sop.fn instanceof Divide) {
 				throw new DMLCompressionException("Not supported left hand side divide Compressed");
 			}
 			else if(sop.fn instanceof Power2) {
@@ -91,8 +93,10 @@
 			}
 		}
 		else {
-			if(sop.getNumThreads() > 1) {
-				parallelScalarOperations(sop, colGroups, ret, sop.getNumThreads());
+			int threadsAvailable = (sop.getNumThreads() > 1) ? sop.getNumThreads() : OptimizerUtils
+				.getConstrainedNumThreads(-1);
+			if(threadsAvailable > 1) {
+				parallelScalarOperations(sop, colGroups, ret, threadsAvailable);
 			}
 			else {
 				// Apply the operation to each of the column groups.
@@ -140,7 +144,7 @@
 			}
 			else {
 				int nv = ((ColGroupValue) grp).getNumValues() * grp.getColIndices().length;
-				if(nv < MINIMUM_PARALLEL_SIZE) {
+				if(nv < MINIMUM_PARALLEL_SIZE && !(grp instanceof ColGroupOLE)) {
 					small.add(grp);
 				}
 				else {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
index 6a921ee..c2c06f3 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
@@ -59,7 +59,7 @@
 	}
 
 	public int getNumValues() {
-		return _values.length / _numCols;
+		return (_values == null) ? 0: _values.length / _numCols;
 	}
 
 	public void sortValuesByFrequency() {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/BitmapLossy.java b/src/main/java/org/apache/sysds/runtime/compress/utils/BitmapLossy.java
index abd0f03..8a6df3f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/BitmapLossy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/BitmapLossy.java
@@ -40,7 +40,6 @@
 		_scale = scale;
 	}
 
-
 	/**
 	 * Get all values without unnecessary allocations and copies.
 	 * 
@@ -71,7 +70,7 @@
 	 *         bitmap per value
 	 */
 	public int getNumValues() {
-		return _values.length / _numCols;
+		return (_values == null) ? 0 : _values.length / _numCols;
 	}
 
 	public IntArrayList getOffsetsList(int ix) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/CustomHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/CustomHashMap.java
index 417aa81..64bee19 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/CustomHashMap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/CustomHashMap.java
@@ -18,17 +18,47 @@
  */
 package org.apache.sysds.runtime.compress.utils;
 
+import org.apache.sysds.runtime.DMLRuntimeException;
+
 /**
  * This class provides a memory-efficient base for Custom HashMaps for restricted use cases.
  */
 public abstract class CustomHashMap {
-	protected static final int INIT_CAPACITY = 32;
+	protected static final int INIT_CAPACITY = 8;
 	protected static final int RESIZE_FACTOR = 2;
-	protected static final float LOAD_FACTOR = 0.30f;
+	protected static final float LOAD_FACTOR = 0.50f;
 
 	protected int _size = -1;
 
 	public int size() {
 		return _size;
 	}
+
+	/**
+	 * Joins the two lists of hashmaps together to form one list containing element wise joins of the hashmaps.
+	 * 
+	 * Also note that the join modifies the left hand side hash map such that it contains the joined values. All values
+	 * in the right hand side is appended to the left hand side, such that the order of the elements is constant after
+	 * the join.
+	 * 
+	 * @param left  The left side hashmaps
+	 * @param right The right side hashmaps
+	 * @return The element-wise join of the two hashmaps.
+	 */
+	public static CustomHashMap[] joinHashMaps(CustomHashMap[] left, CustomHashMap[] right) {
+
+		if(left.length == right.length) {
+			for(int i = 0; i < left.length; i++) {
+				left[i].joinHashMap(right[i]);
+			}
+		}else{
+			throw new DMLRuntimeException("Invalid element wise join of two Hashmaps, of different length.");
+		}
+
+		return left;
+	}
+
+	public CustomHashMap joinHashMap(CustomHashMap that) {
+		return this;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java
index 55065b4..c07b004 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java
@@ -89,6 +89,40 @@
 			resize();
 	}
 
+	public void appendValue(DblArray key, int value){
+		int hash = hash(key);
+		int ix = indexFor(hash, _data.length);
+		IntArrayList lstPtr = null; // The list to add the value to.
+		if(_data[ix] == null) {
+			lstPtr = new IntArrayList();
+			_data[ix] = new DArrayIListEntry(key, lstPtr);
+			_size++;
+		}
+		else {
+			for(DArrayIListEntry e = _data[ix]; e != null; e = e.next) {
+				if(e.key == key) {
+					lstPtr = e.value;
+					break;
+				}
+				else if(e.next == null) {
+					lstPtr = new IntArrayList();
+					// Swap to place the new value, in front.
+					DArrayIListEntry eOld = _data[ix];
+					_data[ix] = new DArrayIListEntry(key, lstPtr);
+					_data[ix].next = eOld;
+					_size++;
+					break;
+				}
+				DblArrayIntListHashMap.hashMissCount++;
+			}
+		}
+		lstPtr.appendValue(value);
+
+		// resize if necessary
+		if(_size >= LOAD_FACTOR * _data.length)
+			resize();
+	}
+
 	public ArrayList<DArrayIListEntry> extractValues() {
 		ArrayList<DArrayIListEntry> ret = new ArrayList<>();
 		for(DArrayIListEntry e : _data) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java
index 56f6169..f85a900 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java
@@ -51,7 +51,15 @@
 
 		// compute entry index position
 		int hash = hash(key);
+		return getHash(key, hash);
+	}
+
+	private IntArrayList getHash(double key, int hash) {
 		int ix = indexFor(hash, _data.length);
+		return getHashIndex(key, ix);
+	}
+
+	private IntArrayList getHashIndex(double key, int ix) {
 
 		// find entry
 		for(DIListEntry e = _data[ix]; e != null; e = e.next) {
@@ -83,6 +91,38 @@
 			resize();
 	}
 
+	public void appendValue(double key, int value) {
+		int hash = hash(key);
+		int ix = indexFor(hash, _data.length);
+		IntArrayList lstPtr = null; // The list to add the value to.
+		if(_data[ix] == null) {
+			lstPtr = new IntArrayList();
+			_data[ix] = new DIListEntry(key, lstPtr);
+			_size++;
+		}
+		else {
+			for(DIListEntry e = _data[ix]; e != null; e = e.next) {
+				if(e.key == key) {
+					lstPtr = e.value;
+					break;
+				}
+				else if(e.next == null) {
+					lstPtr = new IntArrayList();
+					// Swap to place the new value, in front.
+					DIListEntry eOld = _data[ix];
+					_data[ix] = new DIListEntry(key, lstPtr);
+					_data[ix].next = eOld;
+					_size++;
+					break;
+				}
+				DblArrayIntListHashMap.hashMissCount++;
+			}
+		}
+		lstPtr.appendValue(value);
+		if(_size >= LOAD_FACTOR * _data.length)
+			resize();
+	}
+
 	public ArrayList<DIListEntry> extractValues() {
 		ArrayList<DIListEntry> ret = new ArrayList<>();
 		for(DIListEntry e : _data) {
@@ -122,6 +162,8 @@
 	}
 
 	private static int hash(double key) {
+		// return (int) key;
+
 		// basic double hash code (w/o object creation)
 		long bits = Double.doubleToRawLongBits(key);
 		int h = (int) (bits ^ (bits >>> 32));
@@ -159,17 +201,17 @@
 		}
 
 		@Override
-		public String toString(){
+		public String toString() {
 			StringBuilder sb = new StringBuilder();
 			sb.append("[" + key + ", ");
-			sb.append( value + ", ");
-			sb.append( next + "]");
+			sb.append(value + ", ");
+			sb.append(next + "]");
 			return sb.toString();
 		}
 	}
 
 	@Override
-	public String toString(){
+	public String toString() {
 		StringBuilder sb = new StringBuilder();
 		sb.append(this.getClass().getSimpleName() + this.hashCode());
 		sb.append("\n" + Arrays.toString(_data));
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java b/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
index 37d11dc..33072eb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
@@ -42,7 +42,7 @@
 		appendValue(value);
 	}
 
-	public IntArrayList(int[] values){
+	public IntArrayList(int[] values) {
 		_data = values;
 		_size = values.length;
 	}
@@ -101,18 +101,19 @@
 	}
 
 	@Override
-	public String toString(){
+	public String toString() {
 		StringBuilder sb = new StringBuilder();
-		
-		if(_size == 1){
+
+		if(_size == 1) {
 			sb.append(_val0);
-		} else{
+		}
+		else {
 			sb.append("[");
 			int i = 0;
-			for(; i < _size-1; i++){
+			for(; i < _size - 1; i++) {
 				sb.append(_data[i] + ",");
 			}
-			sb.append(_data[i]+"]");
+			sb.append(_data[i] + "]");
 		}
 		return sb.toString();
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java b/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java
index 1bce819..ddf653d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java
@@ -275,11 +275,11 @@
 				int row = (ix < i) ? ix : i;
 				int col = (ix < i) ? i : ix;
 				// if(row == col) {
-					c.set(row, col, a[i]);
+				c.set(row, col, a[i]);
 				// }
 				// else {
-					// double v = c.get(row, col);
-					// c.set(row, col, a[i] + v);
+				// double v = c.get(row, col);
+				// c.set(row, col, a[i] + v);
 				// }
 			}
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
index 0df8108..7a1c42a 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
@@ -22,46 +22,97 @@
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.functionobjects.SwapIndex;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 
 public class AggregateBinaryCPInstruction extends BinaryCPInstruction {
+	// private static final Log LOG = LogFactory.getLog(AggregateBinaryCPInstruction.class.getName());
 
-	private AggregateBinaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr) {
+	public boolean transposeLeft;
+	public boolean transposeRight;
+
+	private AggregateBinaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode,
+		String istr) {
 		super(CPType.AggregateBinary, op, in1, in2, out, opcode, istr);
 	}
 
-	public static AggregateBinaryCPInstruction parseInstruction( String str ) {
+	private AggregateBinaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode,
+		String istr, boolean transposeLeft, boolean transposeRight) {
+		super(CPType.AggregateBinary, op, in1, in2, out, opcode, istr);
+		this.transposeLeft = transposeLeft;
+		this.transposeRight = transposeRight;
+	}
+
+	public static AggregateBinaryCPInstruction parseInstruction(String str) {
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
 		String opcode = parts[0];
 
-		if ( !opcode.equalsIgnoreCase("ba+*")) {
+		if(!opcode.equalsIgnoreCase("ba+*")) {
 			throw new DMLRuntimeException("AggregateBinaryInstruction.parseInstruction():: Unknown opcode " + opcode);
 		}
-		
-		InstructionUtils.checkNumFields(parts, 4);
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand out = new CPOperand(parts[3]);
-		int k = Integer.parseInt(parts[4]);
-		AggregateBinaryOperator aggbin = InstructionUtils.getMatMultOperator(k);
-		return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, opcode, str);
+		int numFields = parts.length - 1;
+		if(numFields == 4) {
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			CPOperand out = new CPOperand(parts[3]);
+			int k = Integer.parseInt(parts[4]);
+			AggregateBinaryOperator aggbin = InstructionUtils.getMatMultOperator(k);
+			return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, opcode, str);
+		}
+		else if(numFields == 6) {
+			CPOperand in1 = new CPOperand(parts[1]);
+			CPOperand in2 = new CPOperand(parts[2]);
+			CPOperand out = new CPOperand(parts[3]);
+			int k = Integer.parseInt(parts[4]);
+			boolean isLeftTransposed = Boolean.parseBoolean(parts[5]);
+			boolean isRightTransposed = Boolean.parseBoolean(parts[6]);
+			AggregateBinaryOperator aggbin = InstructionUtils.getMatMultOperator(k);
+			return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, opcode, str, isLeftTransposed,
+				isRightTransposed);
+		}
+		else {
+			throw new DMLRuntimeException("NumFields expected number  (" + 4 + " or " + 6
+				+ ") != is not equal to actual number (" + numFields + ").");
+		}
 	}
-	
+
 	@Override
 	public void processInstruction(ExecutionContext ec) {
-		//get inputs
+		// get inputs
 		MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
 		MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
-		
-		//compute matrix multiplication
+
+		// compute matrix multiplication
 		AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
-		MatrixBlock main = (matBlock2 instanceof CompressedMatrixBlock) ? matBlock2 : matBlock1;
-		MatrixBlock ret = main.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op);
-		
-		//release inputs/outputs
+		MatrixBlock ret;
+
+		if(matBlock1 instanceof CompressedMatrixBlock) {
+			CompressedMatrixBlock main = (CompressedMatrixBlock) matBlock1;
+			ret = main.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op, transposeLeft, transposeRight);
+		}
+		else if(matBlock2 instanceof CompressedMatrixBlock) {
+			CompressedMatrixBlock main = (CompressedMatrixBlock) matBlock2;
+			ret = main.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op, transposeLeft, transposeRight);
+		}
+		else {
+			// todo move rewrite rule here. to do 
+			// t(x) %*% y -> t(t(y) %*% x)
+			if(transposeLeft){
+				ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), ab_op.getNumThreads());
+				matBlock1 = matBlock1.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+			}
+			if(transposeRight){
+				ReorgOperator r_op = new ReorgOperator(SwapIndex.getSwapIndexFnObject(), ab_op.getNumThreads());
+				matBlock2 = matBlock2.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0);
+			}
+			ret = matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op);
+		}
+
+		// release inputs/outputs
 		ec.releaseMatrixInput(input1.getName());
 		ec.releaseMatrixInput(input2.getName());
 		ec.setMatrixOutput(output.getName(), ret);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java
index f198770..784b46e 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java
@@ -19,6 +19,10 @@
 
 package org.apache.sysds.runtime.instructions.cp;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.AbstractCompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.matrix.data.LibCommonsMath;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -26,6 +30,7 @@
 import org.apache.sysds.runtime.matrix.operators.Operator;
 
 public class BinaryMatrixMatrixCPInstruction extends BinaryCPInstruction {
+	private static final Log LOG = LogFactory.getLog(BinaryMatrixMatrixCPInstruction.class.getName());
 
 	protected BinaryMatrixMatrixCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out,
 			String opcode, String istr) {
@@ -40,16 +45,28 @@
 			ec.setMatrixOutput(output.getName(), solution);
 			ec.releaseMatrixInput(input1.getName());
 			ec.releaseMatrixInput(input2.getName());
+			
 			return;
 		}
 		
 		// Read input matrices
 		MatrixBlock inBlock1 = ec.getMatrixInput(input1.getName());
 		MatrixBlock inBlock2 = ec.getMatrixInput(input2.getName());
-		
+
 		// Perform computation using input matrices, and produce the result matrix
 		BinaryOperator bop = (BinaryOperator) _optr;
-		MatrixBlock retBlock = inBlock1.binaryOperations (bop, inBlock2, new MatrixBlock());
+		MatrixBlock retBlock;
+
+		if(inBlock1 instanceof CompressedMatrixBlock && inBlock2 instanceof CompressedMatrixBlock){
+			retBlock = inBlock1.binaryOperations(bop, inBlock2, new MatrixBlock());
+		} else if(inBlock2 instanceof CompressedMatrixBlock){
+			LOG.error("Binary CP instruction decompressing " + bop);
+			LOG.error("inBlock2 stats: " + inBlock2.getNumRows() + "  " +inBlock2.getNumColumns());
+			inBlock2 = AbstractCompressedMatrixBlock.getUncompressed(inBlock2);
+			retBlock = inBlock1.binaryOperations(bop, inBlock2, new MatrixBlock());
+		} else {
+			retBlock = inBlock1.binaryOperations(bop, inBlock2, new MatrixBlock());
+		}
 		
 		// Release the memory occupied by input matrices
 		ec.releaseMatrixInput(input1.getName(), input2.getName());
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateBinaryOperator.java b/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateBinaryOperator.java
index 8bfaeef..506abea 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateBinaryOperator.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/operators/AggregateBinaryOperator.java
@@ -17,38 +17,33 @@
  * under the License.
  */
 
-
 package org.apache.sysds.runtime.matrix.operators;
 
-import java.io.Serializable;
-
 import org.apache.sysds.runtime.functionobjects.Multiply;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
 
-
-public class AggregateBinaryOperator extends Operator implements Serializable
-{
+public class AggregateBinaryOperator extends Operator {
 	private static final long serialVersionUID = 1666421325090925726L;
 
 	public final ValueFunction binaryFn;
 	public final AggregateOperator aggOp;
-	private final int k; //num threads
-	
+	private final int k; // num threads
+
 	public AggregateBinaryOperator(ValueFunction inner, AggregateOperator outer) {
-		//default degree of parallelism is 1 
-		//(for example in MR/Spark because we parallelize over the number of blocks)
-		this( inner, outer, 1 );
+		// default degree of parallelism is 1
+		// (for example in MR/Spark because we parallelize over the number of blocks)
+		this(inner, outer, 1);
 	}
-	
+
 	public AggregateBinaryOperator(ValueFunction inner, AggregateOperator outer, int numThreads) {
-		//so far, we only support matrix multiplication, and it is sparseSafe
+		// so far, we only support matrix multiplication, and it is sparseSafe
 		super(inner instanceof Multiply && outer.increOp.fn instanceof Plus);
 		binaryFn = inner;
 		aggOp = outer;
 		k = numThreads;
 	}
-	
+
 	public int getNumThreads() {
 		return k;
 	}
diff --git a/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java b/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java
index 26ae97d..f28fa0f 100644
--- a/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java
@@ -21,6 +21,8 @@
 
 public class DMLCompressionStatistics {
 
+	// Compute transpose of input
+	private static double Phase0 = 0.0;
 	// Compute compressed size info
 	private static double Phase1 = 0.0;
 	// Co-code columns
@@ -39,6 +41,9 @@
 
 	public static void addCompressionTime(double time, int phase) {
 		switch(phase) {
+			case 0:
+				Phase0 += time;
+				break;
 			case 1:
 				Phase1 += time;
 				break;
@@ -78,8 +83,10 @@
 	}
 
 	public static void display(StringBuilder sb) {
+		
 		sb.append(String.format(
-			"CLA Compression Phases (classify, group, compress, share, clean) :\t%.3f/%.3f/%.3f/%.3f/%.3f\n",
+			"CLA Compression Phases (transpose, classify, group, compress, share, clean) :\t%.3f/%.3f/%.3f/%.3f/%.3f/%.3f\n",
+			Phase0 / 1000,
 			Phase1 / 1000,
 			Phase2 / 1000,
 			Phase3 / 1000,
diff --git a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
index 8cce3d0..bf9b958 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
@@ -170,7 +170,7 @@
 			MatrixBlock ret1 = mb.aggregateUnaryOperations(auop, new MatrixBlock(), Math.max(rows, cols), null, true);
 			// matrix-vector compressed
 			MatrixBlock ret2 = cmb.aggregateUnaryOperations(auop, new MatrixBlock(), Math.max(rows, cols), null, true);
-
+			// LOG.error(cmb);
 			// compare result with input
 			double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
 			double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index 2e13619..753d4fd 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -80,17 +80,15 @@
 	protected static ValueType[] usedValueTypes = new ValueType[] {
 		// ValueType.RAND,
 		// ValueType.CONST,
-		ValueType.RAND_ROUND, 
+		ValueType.RAND_ROUND,
 		// ValueType.OLE_COMPRESSIBLE,
 		// ValueType.RLE_COMPRESSIBLE,
 	};
 
-	protected static ValueRange[] usedValueRanges = new ValueRange[] {
-		ValueRange.SMALL,
+	protected static ValueRange[] usedValueRanges = new ValueRange[] {ValueRange.SMALL,
 		// ValueRange.LARGE,
 		// ValueRange.BYTE,
-		ValueRange.BOOLEAN,
-	};
+		ValueRange.BOOLEAN,};
 
 	protected static OverLapping[] overLapping = new OverLapping[] {OverLapping.COL,
 		// OverLapping.MATRIX,
@@ -100,19 +98,17 @@
 
 	private static final int compressionSeed = 7;
 
-	protected static CompressionSettings[] usedCompressionSettings = new CompressionSettings[] {
+	protected static CompressionSettingsBuilder[] usedCompressionSettings = new CompressionSettingsBuilder[] {
 		// CLA TESTS!
-
 		new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
-			.setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).create(),
+			.setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true),
 		new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
-			.setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).create(),
+			.setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true),
 		new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
-			.setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).create(),
-		new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setInvestigateEstimate(true)
-			.create(),
+			.setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true),
+		new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setInvestigateEstimate(true),
 		new CompressionSettingsBuilder().setSamplingRatio(1.0).setSeed(compressionSeed).setInvestigateEstimate(true)
-			.setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).create(),
+			.setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1),
 
 		// // // // LOSSY TESTS!
 
@@ -140,12 +136,12 @@
 
 	protected static MatrixTypology[] usedMatrixTypology = new MatrixTypology[] { // Selected Matrix Types
 		// MatrixTypology.SMALL,
-		MatrixTypology.FEW_COL,
+		// MatrixTypology.FEW_COL,
 		// MatrixTypology.FEW_ROW,
 		// MatrixTypology.LARGE,
 		// // MatrixTypology.SINGLE_COL,
 		// MatrixTypology.SINGLE_ROW,
-		// MatrixTypology.L_ROWS,
+		MatrixTypology.L_ROWS,
 		// MatrixTypology.XL_ROWS,
 		// MatrixTypology.SINGLE_COL_L
 	};
@@ -205,8 +201,8 @@
 
 					// vector-matrix compressed
 					cmb = cmb.aggregateBinaryOperations(cmb, tmp, new MatrixBlock(), abop);
-					if(ov == OverLapping.MATRIX_PLUS) {
 
+					if(ov == OverLapping.MATRIX_PLUS) {
 						ScalarOperator sop = new LeftScalarOperator(Plus.getPlusFnObject(), 15);
 						mb = mb.scalarOperations(sop, new MatrixBlock());
 						cmb = cmb.scalarOperations(sop, new MatrixBlock());
@@ -245,10 +241,10 @@
 		for(SparsityType st : usedSparsityTypes)
 			for(ValueType vt : usedValueTypes)
 				for(ValueRange vr : usedValueRanges)
-					for(CompressionSettings cs : usedCompressionSettings)
+					for(CompressionSettingsBuilder cs : usedCompressionSettings)
 						for(MatrixTypology mt : usedMatrixTypology)
 							for(OverLapping ov : overLapping)
-								tests.add(new Object[] {st, vt, vr, cs, mt, ov});
+								tests.add(new Object[] {st, vt, vr, cs.create(), mt, ov});
 		return tests;
 	}
 
@@ -260,7 +256,11 @@
 				// Assert.assertTrue("Compression Failed \n" + this.toString(), false);
 			}
 			double[][] org = DataConverter.convertToDoubleMatrix(mb);
-			double[][] deCompressed = DataConverter.convertToDoubleMatrix(((CompressedMatrixBlock) cmb).decompress(_k));
+			// LOG.error(mb.slice(0, 10, 0, mb.getNumColumns() -1, null));
+			MatrixBlock decompressedMatrixBlock = ((CompressedMatrixBlock) cmb).decompress(_k);
+			// LOG.error(decompressedMatrixBlock.slice(0,10, 0, decompressedMatrixBlock.getNumColumns()-1, null));
+			double[][] deCompressed = DataConverter.convertToDoubleMatrix(decompressedMatrixBlock);
+
 			if(compressionSettings.lossy)
 				TestUtils.compareMatrices(org, deCompressed, lossyTolerance, this.toString());
 			else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE || overlappingType == OverLapping.MATRIX_PLUS ||
@@ -334,78 +334,108 @@
 
 	@Test
 	public void testVectorMatrixMult() {
-
-		if(!(cmb instanceof CompressedMatrixBlock))
-			return; // Input was not compressed then just pass test
-
 		MatrixBlock vector = DataConverter
 			.convertToMatrixBlock(TestUtils.generateTestMatrix(1, rows, 0.9, 1.5, 1.0, 3));
-
 		testLeftMatrixMatrix(vector);
 	}
 
 	@Test
 	public void testLeftMatrixMatrixMultSmall() {
-
-		if(!(cmb instanceof CompressedMatrixBlock))
-			return; // Input was not compressed then just pass test
-
 		MatrixBlock matrix = DataConverter
 			.convertToMatrixBlock(TestUtils.generateTestMatrix(3, rows, 0.9, 1.5, 1.0, 3));
-
 		testLeftMatrixMatrix(matrix);
 
 	}
 
 	@Test
 	public void testLeftMatrixMatrixMultMedium() {
-
-		if(!(cmb instanceof CompressedMatrixBlock))
-			return; // Input was not compressed then just pass test
-
 		MatrixBlock matrix = DataConverter
 			.convertToMatrixBlock(TestUtils.generateTestMatrix(50, rows, 0.9, 1.5, 1.0, 3));
-
 		testLeftMatrixMatrix(matrix);
 	}
 
 	@Test
 	public void testLeftMatrixMatrixMultSparse() {
-
-		if(!(cmb instanceof CompressedMatrixBlock))
-			return; // Input was not compressed then just pass test
-
 		MatrixBlock matrix = DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(2, rows, 0.9, 1.5, .1, 3));
+		testLeftMatrixMatrix(matrix);
+	}
 
+	// @Test
+	// public void testLeftMatrixMatrixMultSparse2() {
+	// MatrixBlock matrix = DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(2, rows, 0.9, 1.5, .1, 3));
+	// SparseBlock sb = matrix.getSparseBlock();
+	// sb.deleteIndexRange(0, 0, rows);
+	// testLeftMatrixMatrix(matrix);
+	// }
+
+	// @Test
+	// public void testLeftMatrixMatrixMultSparse3() {
+	// MatrixBlock matrix = DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(2, rows, 0.9, 1.5, .1, 3));
+	// SparseBlock sb = matrix.getSparseBlock();
+	// sb.deleteIndexRange(0, 0, rows - 2);
+	// sb.deleteIndexRange(1, 0, rows/10*9);
+	// LOG.error(matrix);
+	// testLeftMatrixMatrix(matrix);
+	// }
+
+	@Test
+	public void testLeftMatrixMatrixMultSparseCustom() {
+		MatrixBlock matrix = new MatrixBlock(2, rows, true);
+		matrix.quickSetValue(1, rows - 1, 99);
+		testLeftMatrixMatrix(matrix);
+	}
+
+	@Test
+	public void testLeftMatrixMatrixMultSparseCustom2() {
+		MatrixBlock matrix = new MatrixBlock(2, rows, true);
+		matrix.quickSetValue(1, 0, 99);
+		testLeftMatrixMatrix(matrix);
+	}
+
+	@Test
+	public void testLeftMatrixMatrixMultSparseCustom3() {
+		MatrixBlock matrix = new MatrixBlock(2, rows, true);
+		matrix.quickSetValue(0, 0, -99);
+		matrix.quickSetValue(1, 0, 99);
+		testLeftMatrixMatrix(matrix);
+	}
+
+	@Test
+	public void testLeftMatrixMatrixMultSparseCustom4() {
+		MatrixBlock matrix = new MatrixBlock(2, rows, true);
+		matrix.quickSetValue(0, rows - 1, -99);
+		matrix.quickSetValue(1, 0, 99);
 		testLeftMatrixMatrix(matrix);
 	}
 
 	public void testLeftMatrixMatrix(MatrixBlock matrix) {
+		if(!(cmb instanceof CompressedMatrixBlock))
+			return; // Input was not compressed then just pass test
 		try {
 			// Make Operator
 			AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k);
 
 			// vector-matrix uncompressed
-			MatrixBlock ret1 = mb.aggregateBinaryOperations(matrix, mb, new MatrixBlock(), abop);
 
 			// vector-matrix compressed
 			MatrixBlock ret2 = cmb.aggregateBinaryOperations(matrix, cmb, new MatrixBlock(), abop);
+			MatrixBlock ret1 = mb.aggregateBinaryOperations(matrix, mb, new MatrixBlock(), abop);
 
 			// compare result with input
 			double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
 			double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
 			if(compressionSettings.lossy) {
-				TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.83, compressionSettings.toString());
+				TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.83, this.toString());
 			}
 			else {
 				if(rows > 65000)
-					TestUtils.compareMatricesPercentageDistance(d1, d2, 0.99, 0.99, compressionSettings.toString());
+					TestUtils.compareMatricesPercentageDistance(d1, d2, 0.99, 0.99, this.toString());
 				else if(overlappingType == OverLapping.MATRIX_MULT_NEGATIVE ||
 					overlappingType == OverLapping.MATRIX_PLUS || overlappingType == OverLapping.MATRIX ||
 					overlappingType == OverLapping.COL)
 					TestUtils.compareMatricesBitAvgDistance(d1, d2, 1500000, 1000, this.toString());
 				else
-					TestUtils.compareMatricesBitAvgDistance(d1, d2, 24000, 512, compressionSettings.toString());
+					TestUtils.compareMatricesBitAvgDistance(d1, d2, 24000, 512, this.toString());
 
 			}
 		}
@@ -579,7 +609,6 @@
 		testScalarOperations(sop, lossyTolerance * 7);
 	}
 
-
 	@Test
 	public void testScalarOpRightMultiplyNegative() {
 		double mult = -7;
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java
index 1723d83..89f7ddc 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java
@@ -26,6 +26,7 @@
 
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.functionobjects.CM;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.CMOperator;
@@ -55,10 +56,10 @@
 		for(SparsityType st : usedSparsityTypes)
 			for(ValueType vt : usedValueTypes)
 				for(ValueRange vr : usedValueRanges)
-					for(CompressionSettings cs : usedCompressionSettings)
+					for(CompressionSettingsBuilder cs : usedCompressionSettings)
 						for(MatrixTypology mt : usedMatrixTypologyLocal)
 							for(OverLapping ov : overLapping)
-								tests.add(new Object[] {st, vt, vr, cs, mt, ov});
+								tests.add(new Object[] {st, vt, vr, cs.create(), mt, ov});
 
 		return tests;
 	}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
index e9cfb1d..eb7e5ce 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
@@ -28,8 +28,8 @@
 	private static final int cols[] = {20, 20, 13, 1, 321, 1, 5, 1, 1};
 	private static final double[] sparsityValues = {0.9, 0.1, 0.01, 0.0, 1.0};
 
-	private static final int[] mins = {-10, -127 * 2};
-	private static final int[] maxs = {10, 127};
+	private static final int[] mins = {-4, -127 * 2};
+	private static final int[] maxs = {5, 127};
 
 	public enum SparsityType {
 		DENSE, SPARSE, ULTRA_SPARSE, EMPTY, FULL
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
index 6e085d4..7116503 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
@@ -23,6 +23,8 @@
 
 import java.util.EnumSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.BitmapEncoder;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
@@ -41,6 +43,8 @@
 @RunWith(value = Parameterized.class)
 public abstract class JolEstimateTest {
 
+	protected static final Log LOG = LogFactory.getLog(JolEstimateTest.class.getName());
+
 	protected static final CompressionType ddc = CompressionType.DDC;
 	protected static final CompressionType ole = CompressionType.OLE;
 	protected static final CompressionType rle = CompressionType.RLE;
@@ -66,15 +70,16 @@
 			.setValidCompressions(vc);
 		this.cs = csb.create();
 		this.csl = csb.setLossy(true).setSortValuesByLength(false).create();
-
+		cs.transposed = true;
+		csl.transposed = true;
 		int[] colIndexes = new int[mbt.getNumRows()];
 		for(int x = 0; x < mbt.getNumRows(); x++) {
 			colIndexes[x] = x;
 		}
 		try {
-			ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, mbt, cs);
+			ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, mbt, true);
 			cg = ColGroupFactory.compress(colIndexes, mbt.getNumColumns(), ubm, getCT(), cs, mbt);
-			ABitmap ubml = BitmapEncoder.extractBitmap(colIndexes, mbt, csl);
+			ABitmap ubml = BitmapEncoder.extractBitmap(colIndexes, mbt, true);
 			cgl = ColGroupFactory.compress(colIndexes, mbt.getNumColumns(), ubml, getCT(), csl, mbt);
 
 		}
@@ -87,10 +92,12 @@
 	@Test
 	public void compressedSizeInfoEstimatorExact() {
 		try {
-			// CompressionSettings cs = new CompressionSettings(1.0);
-			CompressedSizeEstimator cse = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs);
+			CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(1.0).setValidCompressions(EnumSet.of(getCT())).create();
+			CompressedSizeEstimator cse = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs, true);
+
 			CompressedSizeInfoColGroup csi = cse.estimateCompressedColGroupSize();
 			long estimateCSI = csi.getCompressionSize(getCT());
+
 			long estimateObject = cg.estimateInMemorySize();
 			String errorMessage = "CSI estimate " + estimateCSI + " should be exactly " + estimateObject + "\n"
 				+ cg.toString();
@@ -112,10 +119,11 @@
 	public void compressedSizeInfoEstimatorExactLossy() {
 		try {
 			// CompressionSettings cs = new CompressionSettings(1.0);
-			CompressedSizeEstimator cse = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, csl);
+			CompressedSizeEstimator cse = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, csl, true);
 			CompressedSizeInfoColGroup csi = cse.estimateCompressedColGroupSize();
 			long estimateCSI = csi.getCompressionSize(getCT());
 			long estimateObject = cgl.estimateInMemorySize();
+
 			String errorMessage = "CSI estimate " + estimateCSI + " should be exactly " + estimateObject + "\n"
 				+ cg.toString();
 			boolean res = Math.abs(estimateCSI - estimateObject) <= tolerance;