MAHOUT-1837: Sparse/Dense Matrix analysis for Matrix Multiplication. closes apache/mahout#228
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 86c7054..cdec954 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -23,6 +23,7 @@
 
 import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
+
 import collection._
 
 package object drm {
@@ -354,7 +355,6 @@
       keys → block
     }
   }
-
 }
 
 package object indexeddataset {
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
index d72d2f0..6389806 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
@@ -35,7 +35,7 @@
 
     val (af, bf) = (a.getFlavor, b.getFlavor)
     val backs = (af.getBacking, bf.getBacking)
-    val sd = (af.getStructure, af.isDense, bf.getStructure, bf.isDense)
+    val sd = (af.getStructure, sparsityAnalysis(a), bf.getStructure, sparsityAnalysis(b))
 
     val alg: MMulAlg = backs match {
 
@@ -124,7 +124,7 @@
     require(r.forall(mxR ⇒ mxR.nrow == a.nrow && mxR.ncol == b.ncol))
     val (m, n) = (a.nrow, b.ncol)
 
-    val mxR = r.getOrElse(if (a.getFlavor.isDense) a.like(m, n) else b.like(m, n))
+    val mxR = r.getOrElse(if (sparsityAnalysis(a)) a.like(m, n) else b.like(m, n))
 
     for (row ← 0 until mxR.nrow; col ← 0 until mxR.ncol) {
       // this vector-vector should be sort of optimized, right?
@@ -269,10 +269,10 @@
     val (m, n) = (a.nrow, b.ncol)
 
     // Prefer col-wise result iff a is dense and b is sparse. In all other cases default to row-wise.
-    val preferColWiseR = a.getFlavor.isDense && !b.getFlavor.isDense
+    val preferColWiseR = sparsityAnalysis(a) && !sparsityAnalysis(b)
 
     val mxR = r.getOrElse {
-      (a.getFlavor.isDense, preferColWiseR) match {
+      (sparsityAnalysis(a), preferColWiseR) match {
         case (false, false) ⇒ b.like(m, n)
         case (false, true) ⇒ b.like(n, m).t
         case (true, false) ⇒ a.like(m, n)
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index 10c534b..71a2c11 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -18,7 +18,9 @@
 package org.apache.mahout.math
 
 import org.apache.mahout.math.solver.EigenDecomposition
+
 import collection._
+import scala.util.Random
 
 /**
  * Mahout matrices and vectors' scala syntactic sugar
@@ -29,6 +31,12 @@
   // Reserved "ALL" range
   final val `::`: Range = null
 
+  // values for stochastic sparsityAnalysis
+  final val z95 = 1.959964
+  final val z80 = 1.281552
+  final val maxSamples = 500
+  final val minSamples = 15
+
   // Some enums
   object AutoBooleanEnum extends Enumeration {
     type T = Value
@@ -410,4 +418,57 @@
 
   def dist(mxX: Matrix, mxY: Matrix): Matrix = sqDist(mxX, mxY) := sqrt _
 
+  /**
+    * Check the density of an in-core matrix based on supplied criteria.
+    * Returns true if we think mx is densier than threshold with at least 80% confidence.
+    *
+    * @param mx  The matrix to check density of.
+    * @param threshold the threshold of non-zero elements above which we consider a Matrix Dense
+    */
+  def sparsityAnalysis(mx: Matrix, threshold: Double = 0.25): Boolean = {
+
+    require(threshold >= 0.0 && threshold <= 1.0)
+    var n = minSamples
+    var mean = 0.0
+    val rnd = new Random()
+    val dimm = mx.nrow
+    val dimn = mx.ncol
+    val pq = threshold * (1 - threshold)
+
+    for (s ← 0 until minSamples) {
+      if (mx(rnd.nextInt(dimm), rnd.nextInt(dimn)) != 0.0) mean += 1
+    }
+    mean /= minSamples
+    val iv = z80 * math.sqrt(pq / n)
+
+    if (mean < threshold - iv) return false // sparse
+    else if (mean > threshold + iv) return true // dense
+
+    while (n < maxSamples) {
+      // Determine upper bound we may need for n to likely relinquish the uncertainty. Here, we use
+      // confidence interval formula but solved for n.
+      val ivNeeded = math.abs(threshold - mean) max 1e-11
+
+      val stderr = ivNeeded / z80
+      val nNeeded = (math.ceil(pq / (stderr * stderr)).toInt max n min maxSamples) - n
+
+      var meanNext = 0.0
+      for (s ← 0 until nNeeded) {
+        if (mx(rnd.nextInt(dimm), rnd.nextInt(dimn)) != 0.0) meanNext += 1
+      }
+      mean = (n * mean + meanNext) / (n + nNeeded)
+      n += nNeeded
+
+      // Are we good now?
+      val iv = z80 * math.sqrt(pq / n)
+      if (mean < threshold - iv) return false // sparse
+      else if (mean > threshold + iv) return true // dense
+    }
+
+    return mean <= threshold
+
+  }
+
+
+
 }
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
index 0503e49..264e375 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MathSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.mahout.math.scalabindings
 
+import org.apache.log4j.Level
+
 import org.apache.mahout.logging._
 import org.apache.mahout.math._
 import org.apache.mahout.math.scalabindings.RLikeOps._
@@ -234,4 +236,32 @@
     (mxDsq - mxDsqControl).norm should be < 1e-7
   }
 
+  test("sparsity analysis") {
+    setLogLevel(Level.DEBUG)
+
+    val m = 500
+    val n = 800
+    val mxA = new DenseMatrix(m, n)
+
+    sparsityAnalysis(mxA) shouldBe false
+    sparsityAnalysis(mxA, .5) shouldBe false
+    sparsityAnalysis(mxA + 1) shouldBe true
+    sparsityAnalysis(mxA + 1, .95) shouldBe true
+
+    for (i ← 0 until m by 5) mxA(i, ::) := 1
+    info(s"20% detected as dense?:${sparsityAnalysis(mxA)}")
+    mxA := 0
+
+    for (i ← 0 until m by 3) mxA(i, ::) := 1
+    info(s"33% detected as dense?:${sparsityAnalysis(mxA)}")
+    mxA := 0
+
+    for (i ← 0 until m by 4) mxA(i, ::) := 1
+    info(s"25% detected as dense?:${sparsityAnalysis(mxA)}")
+
+    for (i ← 0 until m by 2) mxA(i, ::) := 1
+    info(s"50% detected as dense?:${sparsityAnalysis(mxA)}")
+
+  }
+
 }
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 676b496..b57d8ae 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -20,13 +20,15 @@
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
 import org.apache.spark.rdd.RDD
+
 import scala.reflect.ClassTag
 import org.apache.mahout.sparkbindings._
 import org.apache.mahout.math.drm.BlockifiedDrmTuple
 import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.math.{SparseMatrix, Matrix, SparseRowMatrix}
+import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, SparseRowMatrix}
 import org.apache.mahout.math.drm.logical.OpABt
 import org.apache.mahout.logging._
+import org.apache.mahout.math.flavor.MatrixFlavor
 
 /** Contains RDD plans for ABt operator */
 object ABt {
@@ -116,7 +118,12 @@
           // Empty combiner += value
           createCombiner = (t: (Array[K], Array[Int], Matrix)) =>  {
             val (rowKeys, colKeys, block) = t
-            val comb = new SparseMatrix(prodNCol, block.nrow).t
+
+            val comb = if (block.getFlavor == MatrixFlavor.SPARSELIKE) {
+              new SparseMatrix(prodNCol, block.nrow).t
+            } else {
+              new DenseMatrix(prodNCol, block.nrow).t
+            }
 
             for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i)
             rowKeys -> comb
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index 64065d9..aca7c3c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -60,19 +60,27 @@
         val keys = data.map(t => t._1).toArray[K]
         val vectors = data.map(t => t._2).toArray
 
-        val block = if (vectors(0).isDense) {
-          val block = new DenseMatrix(vectors.length, blockncol)
-          var row = 0
-          while (row < vectors.length) {
-            block(row, ::) := vectors(row)
-            row += 1
-          }
+        // create the block by default as dense.
+        // would probably be better to sample a subset of these
+        // vectors first before creating the entire matrix.
+        // so that we don't have the overhead of creating a full second matrix in
+        // the case that the matrix is not dense.
+        val block = new DenseMatrix(vectors.length, blockncol)
+        var row = 0
+        while (row < vectors.length) {
+          block(row, ::) := vectors(row)
+          row += 1
+        }
+
+        // Test the density of the data. If the matrix does not meet the
+        // requirements for density, convert the Vectors to a sparse Matrix.
+        val resBlock = if (sparsityAnalysis(block)) {
           block
         } else {
           new SparseRowMatrix(vectors.length, blockncol, vectors, true, false)
         }
 
-        Iterator(keys -> block)
+        Iterator(keys -> resBlock)
       }
     })
   }
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
index a5cb7f8..8f9b00f 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala
@@ -49,7 +49,7 @@
     }).norm should be < 1e-4
   }
 
-  test("DRM blockify sparse -> SRM") {
+  test("DRM blockify sparse -> DRM") {
 
     val inCoreA = sparse(
       (1, 2, 3),
@@ -59,7 +59,7 @@
 
     (inCoreA - drmA.mapBlock() {
       case (keys, block) =>
-        if (!block.isInstanceOf[SparseRowMatrix])
+        if (block.isInstanceOf[SparseRowMatrix])
           throw new AssertionError("Block must be dense.")
         keys -> block
     }).norm should be < 1e-4