Made changes after code review.
Fixed conflicts after rebasing with incubator-spot/master
Fixed minor typos in DNSSuspiciousConnectsAnalysisTest.scala
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
index 7a5266c..315aae6 100644
--- a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
@@ -48,7 +48,7 @@
              ldaSeed: Option[Long],
              ldaAlpha: Double,
              ldaBeta: Double,
-             ldaOptimizer: String,
+             ldaOptimizerOption: String,
              maxIterations: Int,
              precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
 
@@ -82,14 +82,13 @@
         wordDictionary,
         sqlContext)
 
-    val corpusSize = ldaCorpus.count()
-
     docWordCountCache.unpersist()
 
     // Instantiate optimizer based on input
     val ldaOptimizer = ldaOptimizerOption match {
       case "em" => new EMLDAOptimizer
       case "online" => new OnlineLDAOptimizer().setOptimizeDocConcentration(true).setMiniBatchFraction({
+        val corpusSize = ldaCorpus.count()
         if (corpusSize < 2) 0.75
         else (0.05 + 1) / corpusSize
       })
diff --git a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
index 2435561..a88a00a 100644
--- a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
@@ -88,7 +88,7 @@
     precisionUtility = FloatPointPrecisionUtility32)
 
 
-  "dns supicious connects analysis" should "estimate correct probabilities in toy data with framelength anomaly using" +
+  "dns suspicious connects analysis" should "estimate correct probabilities in toy data with framelength anomaly using" +
     " EMLDAOptimizer" in {
 
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
@@ -120,7 +120,7 @@
     val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
     val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
     val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain)
+    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
     val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
 
@@ -132,7 +132,7 @@
     Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
   }
 
-  "dns supicious connects analysis" should "estimate correct probabilities in toy data with subdomain length anomaly " +
+  it should "estimate correct probabilities in toy data with subdomain length anomaly " +
     "using EMLDAOptimizer" in {
 
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
@@ -156,7 +156,7 @@
       0)
     val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
     val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain)
+    val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
     val anomalyScore = scoredData.
       filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
       first().
@@ -171,10 +171,8 @@
     Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
   }
 
-  "dns supicious connects analysis" should "estimate correct probabilities in toy data with subdomain length anomaly " +
+  it should "estimate correct probabilities in toy data with subdomain length anomaly " +
     "using OnlineLDAOptimizer" in {
-  "dns suspicious connects analysis" should "estimate correct probabilities in toy data with subdomain length anomaly" in {
-  it should "estimate correct probabilities in toy data with subdomain length anomaly" in {
 
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
@@ -195,11 +193,11 @@
       "0x00000001",
       1,
       0)
+
     val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
     val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain)
-    val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, testConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, testConfig.userDomain, testConfig.precisionUtility)
+    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
+
     val anomalyScore = scoredData.
       filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
       first().
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
index e701bb9..5fb9fb3 100644
--- a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
@@ -32,20 +32,22 @@
 
 class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
 
-  val ldaAlpha = 1.02
-  val ldaBeta = 1.001
-  val ldaMaxIterations = 20
-
-  "SparkLDA" should "handle an extremely unbalanced two word doc" in {
+  "SparkLDA" should "handle an extremely unbalanced two word doc with EM optimizer" in {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
+    val ldaAlpha =  1.02
+    val ldaBeta = 1.001
+    val ldaMaxIterations = 20
+
+    val optimizer = "em"
+
     val catFancy = SpotLDAInput("pets", "cat", 1)
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
     val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      ldaMaxIterations, FloatPointPrecisionUtility64)
+      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
 
     val topicMixDF = out.docToTopicMix
 
@@ -59,16 +61,116 @@
     Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
   }
 
-  it should "handle an extremely unbalanced two word doc with doc probabilities as Float" in {
+  it should "handle distinct docs on distinct words with EM optimizer" in {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
+    val ldaAlpha =  1.2
+    val ldaBeta = 1.001
+    val ldaMaxIterations = 20
+
+    val optimizer = "em"
+
+    val catFancy = SpotLDAInput("cat fancy", "cat", 1)
+    val dogWorld = SpotLDAInput("dog world", "dog", 1)
+
+    val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
+    val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
+
+    val topicMixDF = out.docToTopicMix
+    val dogTopicMix: Array[Double] =
+      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
+        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+    val catTopicMix: Array[Double] =
+      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
+        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+    val catTopics = out.wordResults("cat")
+    val dogTopics = out.wordResults("dog")
+
+    Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
+    Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
+  }
+
+  it should "handle an extremely unbalanced two word doc with Online optimizer" in {
+    val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+    logger.setLevel(Level.WARN)
+
+    val ldaAlpha =  0.0009
+    val ldaBeta = 0.00001
+    val ldaMaxIterations = 400
+
+    val optimizer = "online"
+
     val catFancy = SpotLDAInput("pets", "cat", 1)
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
     val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      ldaMaxIterations, FloatPointPrecisionUtility32)
+      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
+
+    val topicMixDF = out.docToTopicMix
+
+    val topicMix =
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+        .asInstanceOf[Seq[Double]].toArray
+    val catTopics = out.wordResults("cat")
+    val dogTopics = out.wordResults("dog")
+
+    Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
+    Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
+  }
+
+  it should "handle distinct docs on distinct words with Online optimizer" in {
+    val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+    logger.setLevel(Level.WARN)
+
+    val ldaAlpha =  0.0009
+    val ldaBeta = 0.00001
+    val ldaMaxIterations = 400
+    val optimizer = "online"
+
+    val catFancy = SpotLDAInput("cat fancy", "cat", 1)
+    val dogWorld = SpotLDAInput("dog world", "dog", 1)
+
+    val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
+    val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
+
+    val topicMixDF = out.docToTopicMix
+    val dogTopicMix: Array[Double] =
+      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
+        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+    val catTopicMix: Array[Double] =
+      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
+        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+    val catTopics = out.wordResults("cat")
+    val dogTopics = out.wordResults("dog")
+
+    Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
+    Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
+  }
+
+  it should "handle an extremely unbalanced two word doc with doc probabilities as Float" in {
+    val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+    logger.setLevel(Level.WARN)
+
+    val ldaAlpha =  1.02
+    val ldaBeta = 1.001
+    val ldaMaxIterations = 20
+
+    val optimizer = "em"
+
+    val catFancy = SpotLDAInput("pets", "cat", 1)
+    val dogWorld = SpotLDAInput("pets", "dog", 999)
+
+    val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
+    val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
     val topicMixDF = out.docToTopicMix
 
@@ -82,41 +184,22 @@
     Math.abs(0.999 - (topicMix(0).toDouble * dogTopics(0) + topicMix(1).toDouble * dogTopics(1))) should be < 0.01
   }
 
-  it should "handle distinct docs on distinct words" in {
-    val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
-    logger.setLevel(Level.WARN)
-    val catFancy = SpotLDAInput("cat fancy", "cat", 1)
-    val dogWorld = SpotLDAInput("dog world", "dog", 1)
-
-    val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      ldaMaxIterations, FloatPointPrecisionUtility64)
-
-    val topicMixDF = out.docToTopicMix
-    val dogTopicMix: Array[Double] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq(0)
-        .asInstanceOf[Seq[Double]].toArray
-
-    val catTopicMix: Array[Double] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq(0)
-        .asInstanceOf[Seq[Double]].toArray
-
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
-
-    Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
-    Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
-  }
-
   it should "handle distinct docs on distinct words with doc probabilities as Float" in {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
+
+    val ldaAlpha =  1.02
+    val ldaBeta = 1.001
+    val ldaMaxIterations = 20
+
+    val optimizer = "em"
+
     val catFancy = SpotLDAInput("cat fancy", "cat", 1)
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
     val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      ldaMaxIterations, FloatPointPrecisionUtility32)
+      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
     val topicMixDF = out.docToTopicMix
     val dogTopicMix: Array[Float] =
diff --git a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
index 397d938..e0bdf83 100644
--- a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
@@ -94,7 +94,7 @@
     val model =
       FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
 
-    val scoredData = model.score(sparkContext, sqlContext, data, testConfig.precisionUtility)
+    val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.precisionUtility)
 
     val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(Hour) === 13).collect().map(_.getAs[Double](Score))
@@ -128,7 +128,7 @@
     val model =
       FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
 
-    val scoredData = model.score(sparkContext, sqlContext, data)
+    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.precisionUtility)
 
     val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(Hour) === 13).collect().map(_.getAs[Double](Score))
diff --git a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
index f7b2877..274804d 100644
--- a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
@@ -108,7 +108,7 @@
 
     val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfigProxy, data)
 
-    val scoredData = model.score(sparkContext, data)
+    val scoredData = model.score(sparkContext, data, emTestConfigProxy.precisionUtility)
 
     val anomalyScore = scoredData.filter(scoredData(Host) === "intel.com").first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(Host) === "maw.bronto.com").collect().map(_.getAs[Double](Score))
@@ -152,7 +152,7 @@
 
     val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfigProxy, data)
 
-    val scoredData = model.score(sparkContext, data, testConfigProxy.precisionUtility)
+    val scoredData = model.score(sparkContext, data, onlineTestConfigProxy.precisionUtility)
 
     val anomalyScore = scoredData.filter(scoredData(Host) ===  "intel.com").first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(Host) === "maw.bronto.com").collect().map(_.getAs[Double](Score))