[SYSTEMDS-2744] Minor performance improvements sparse operations

* Dense-sparse vector transpose
* Sparse-sparse matrix multiplication with small right hand side
* Avoid unnecessary overhead for scanning for permutation matmult
diff --git a/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java b/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
index 6d60293..91fab8eb 100644
--- a/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
@@ -49,6 +49,18 @@
 		indexes = new int[capacity];
 	}
 	
+	public SparseRowVector(int nnz, double[] v, int vlen) {
+		values = new double[nnz];
+		indexes = new int[nnz];
+		for(int i=0, pos=0; i<vlen; i++)
+			if( v[i] != 0 ) {
+				values[pos] = v[i];
+				indexes[pos] = i;
+				pos++;
+			}
+		size = nnz;
+	}
+	
 	public SparseRowVector(int estnnz, int maxnnz) {
 		if( estnnz > initialCapacity )
 			estimatedNzs = estnnz;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
index f0fcad4..2fa3a22 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
@@ -183,6 +183,7 @@
 		return ret;
 	}
 	
+	@SuppressWarnings("unused")
 	private boolean equalsLI(LineageItem that) {
 		if (isVisited() || this == that)
 			return true;
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
index f8d5436..a117a8d 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
@@ -1417,89 +1417,116 @@
 		int cd = m1.clen;
 		
 		// MATRIX-MATRIX (VV, MV not applicable here because V always dense)
-		if(LOW_LEVEL_OPTIMIZATION)
-		{
-			if( pm2 && m==1 )          //VECTOR-MATRIX
-			{
-				//parallelization over rows in rhs matrix
-				if( !a.isEmpty(0) ) 
-				{
-					int alen = a.size(0);
-					int[] aix = a.indexes(0);
-					double[] avals = a.values(0);
-					double[] cvals = c.valuesAt(0);
-					int rlix = (rl==0) ? 0 : a.posFIndexGTE(0,rl);
-					rlix = (rlix>=0) ? rlix : alen;
-					
-					for( int k=rlix; k<alen && aix[k]<ru; k++ )
-						if( !b.isEmpty(aix[k]) ) {
-							int bpos = b.pos(aix[k]);
-							int blen = b.size(aix[k]);
-							int[] bix = b.indexes(aix[k]);
-							double[] bvals = b.values(aix[k]);
-							vectMultiplyAdd(avals[k], bvals, cvals, bix, bpos, 0, blen);
-						}
-				}
-			}
-			else                       //MATRIX-MATRIX
-			{
-				//block sizes for best-effort blocking w/ sufficient row reuse in B yet small overhead
-				final int blocksizeI = 32;
-				final int blocksizeK = Math.max(32, UtilFunctions.nextIntPow2(
-					(int)Math.pow((double)m*cd/m1.nonZeros,2)));
-				
-				//temporary array of current sparse positions
-				int[] curk = new int[Math.min(blocksizeI, ru-rl)];
-				
-				//blocked execution over IK 
-				for( int bi = rl; bi < ru; bi+=blocksizeI ) {
-					Arrays.fill(curk, 0); //reset positions
-					for( int bk = 0, bimin = Math.min(ru, bi+blocksizeI); bk < cd; bk+=blocksizeK ) {
-						final int bkmin = Math.min(cd, bk+blocksizeK); 
-						//core sub block matrix multiplication
-						for( int i=bi; i<bimin; i++ ) {
-							if( a.isEmpty(i) ) continue;
-							final int apos = a.pos(i);
-							final int alen = a.size(i);
-							int[] aix = a.indexes(i);
-							double[] avals = a.values(i);
-							double[] cvals = c.values(i);
-							int cix = c.pos(i);
-							int k = curk[i-bi] + apos;
-							for(; k < apos+alen && aix[k]<bkmin; k++) {
-								if( b.isEmpty(aix[k]) ) continue;
-								vectMultiplyAdd(avals[k], b.values(aix[k]), cvals,
-									b.indexes(aix[k]), b.pos(aix[k]), cix, b.size(aix[k]));
-							}
-							curk[i-bi] = k - apos;
-						}
-					}
-				}
-			}
+		if(LOW_LEVEL_OPTIMIZATION) {
+			if( pm2 && m==1 )               //VECTOR-MATRIX
+				matrixMultSparseSparseVM(a, b, c, rl, ru);
+			else if( m2.nonZeros < 2048 )   //MATRIX-SMALL MATRIX
+				matrixMultSparseSparseMMSmallRHS(a, b, c, rl, ru);
+			else                            //MATRIX-MATRIX
+				matrixMultSparseSparseMM(a, b, c, m, cd, m1.nonZeros, rl, ru);
 		}
 		else {
-			for( int i=rl; i<Math.min(ru, a.numRows()); i++ ) {
-				if( a.isEmpty(i) ) continue;
-				int apos = a.pos(i);
-				int alen = a.size(i);
-				int[] aix = a.indexes(i);
-				double[] avals = a.values(i);
-				double[] cvals = c.values(i);
-				int cix = c.pos(i);
-				for(int k = apos; k < apos+alen; k++) {
-					if( b.isEmpty(aix[k]) ) continue;
-					double val = avals[k];
-					int bpos = b.pos(aix[k]);
-					int blen = b.size(aix[k]);
-					int[] bix = b.indexes(aix[k]);
-					double[] bvals = b.values(aix[k]);
-					for(int j = bpos; j < bpos+blen; j++)
-						cvals[cix+bix[j]] += val * bvals[j];
+			matrixMultSparseSparseMMGeneric(a, b, c, rl, ru);
+		}
+	}
+	
+	private static void matrixMultSparseSparseVM(SparseBlock a, SparseBlock b, DenseBlock c, int rl, int ru) {
+		//parallelization over rows in rhs matrix
+		if( a.isEmpty(0) )
+			return;
+		
+		int alen = a.size(0);
+		int[] aix = a.indexes(0);
+		double[] avals = a.values(0);
+		double[] cvals = c.valuesAt(0);
+		int rlix = (rl==0) ? 0 : a.posFIndexGTE(0,rl);
+		rlix = (rlix>=0) ? rlix : alen;
+		
+		for( int k=rlix; k<alen && aix[k]<ru; k++ )
+			if( !b.isEmpty(aix[k]) ) {
+				int bpos = b.pos(aix[k]);
+				int blen = b.size(aix[k]);
+				int[] bix = b.indexes(aix[k]);
+				double[] bvals = b.values(aix[k]);
+				vectMultiplyAdd(avals[k], bvals, cvals, bix, bpos, 0, blen);
+			}
+	}
+	
+	private static void matrixMultSparseSparseMMSmallRHS(SparseBlock a, SparseBlock b, DenseBlock c, int rl, int ru) {
+		for( int i=rl; i<Math.min(ru, a.numRows()); i++ ) {
+			if( a.isEmpty(i) ) continue;
+			final int apos = a.pos(i);
+			final int alen = a.size(i);
+			int[] aix = a.indexes(i);
+			double[] avals = a.values(i);
+			double[] cvals = c.values(i);
+			int cix = c.pos(i);
+			for(int k = apos; k < apos+alen; k++) {
+				int aixk = aix[k];
+				if( b.isEmpty(aixk) ) continue;
+				vectMultiplyAdd(avals[k], b.values(aixk), cvals,
+					b.indexes(aixk), b.pos(aixk), cix, b.size(aixk));
+			}
+		}
+	}
+	
+	private static void matrixMultSparseSparseMM(SparseBlock a, SparseBlock b, DenseBlock c, int m, int cd, long nnz1, int rl, int ru) {
+		//block sizes for best-effort blocking w/ sufficient row reuse in B yet small overhead
+		final int blocksizeI = 32;
+		final int blocksizeK = Math.max(32,
+			UtilFunctions.nextIntPow2((int)Math.pow((double)m*cd/nnz1, 2)));
+		
+		//temporary array of current sparse positions
+		int[] curk = new int[Math.min(blocksizeI, ru-rl)];
+		
+		//blocked execution over IK 
+		for( int bi = rl; bi < ru; bi+=blocksizeI ) {
+			Arrays.fill(curk, 0); //reset positions
+			for( int bk = 0, bimin = Math.min(ru, bi+blocksizeI); bk < cd; bk+=blocksizeK ) {
+				final int bkmin = Math.min(cd, bk+blocksizeK); 
+				//core sub block matrix multiplication
+				for( int i=bi; i<bimin; i++ ) {
+					if( a.isEmpty(i) ) continue;
+					final int apos = a.pos(i);
+					final int alen = a.size(i);
+					int[] aix = a.indexes(i);
+					double[] avals = a.values(i);
+					double[] cvals = c.values(i);
+					int cix = c.pos(i);
+					int k = curk[i-bi] + apos;
+					for(; k < apos+alen && aix[k]<bkmin; k++) {
+						if( b.isEmpty(aix[k]) ) continue;
+						vectMultiplyAdd(avals[k], b.values(aix[k]), cvals,
+							b.indexes(aix[k]), b.pos(aix[k]), cix, b.size(aix[k]));
+					}
+					curk[i-bi] = k - apos;
 				}
 			}
 		}
 	}
-
+	
+	private static void matrixMultSparseSparseMMGeneric(SparseBlock a, SparseBlock b, DenseBlock c, int rl, int ru) {
+		for( int i=rl; i<Math.min(ru, a.numRows()); i++ ) {
+			if( a.isEmpty(i) ) continue;
+			int apos = a.pos(i);
+			int alen = a.size(i);
+			int[] aix = a.indexes(i);
+			double[] avals = a.values(i);
+			double[] cvals = c.values(i);
+			int cix = c.pos(i);
+			for(int k = apos; k < apos+alen; k++) {
+				if( b.isEmpty(aix[k]) ) continue;
+				double val = avals[k];
+				int bpos = b.pos(aix[k]);
+				int blen = b.size(aix[k]);
+				int[] bix = b.indexes(aix[k]);
+				double[] bvals = b.values(aix[k]);
+				for(int j = bpos; j < bpos+blen; j++)
+					cvals[cix+bix[j]] += val * bvals[j];
+			}
+		}
+	}
+	
 	/**
 	 * This implementation applies to any combination of dense/sparse if at least one
 	 * input is ultrasparse (sparse and very few nnz). In that case, most importantly,
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
index 891dfbe..edc38a7 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java
@@ -25,6 +25,7 @@
 import org.apache.sysds.runtime.data.DenseBlockFactory;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseBlockCSR;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.functionobjects.DiagIndex;
 import org.apache.sysds.runtime.functionobjects.RevIndex;
 import org.apache.sysds.runtime.functionobjects.SortIndex;
@@ -819,8 +820,8 @@
 		
 		if( out.rlen == 1 ) //VECTOR-VECTOR
 		{
-			c.allocate(0, (int)in.nonZeros); 
-			c.setIndexRange(0, 0, m, a.valuesAt(0), 0, m);
+			//allocate row once by nnz, copy non-zeros
+			c.set(0, new SparseRowVector((int)in.nonZeros, a.valuesAt(0), m), false);
 		}
 		else //general case: MATRIX-MATRIX
 		{
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index e5334fb..4cea827 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -959,7 +959,7 @@
 	}
 	
 	public boolean isSparsePermutationMatrix() {
-		if( !isInSparseFormat() )
+		if( !isInSparseFormat() || nonZeros > rlen )
 			return false;
 		SparseBlock sblock = getSparseBlock();
 		boolean isPM = (sblock != null);
@@ -1040,8 +1040,9 @@
 		boolean sparseDst = evalSparseFormatInMemory(); 
 		
 		//check for empty blocks (e.g., sparse-sparse)
-		if( isEmptyBlock(false) )
+		if( isEmptyBlock(false) ) {
 			cleanupBlock(true, true);
+		}
 		
 		//change representation if required (also done for 
 		//empty blocks in order to set representation flags)
diff --git a/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java b/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
index 9244fb1..2cf1c4e 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameCastingTest.java
@@ -29,8 +29,6 @@
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestUtils;
 
-import java.util.Arrays;
-
 public class FrameCastingTest extends AutomatedTestBase
 {
 	private final static int rows = 2891;