Made changes after code review.
Fixed conflicts after rebasing with incubator-spot/master
Fixed minor typos in DNSSuspiciousConnectsAnalysisTest.scala
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
index 7a5266c..315aae6 100644
--- a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
@@ -48,7 +48,7 @@
ldaSeed: Option[Long],
ldaAlpha: Double,
ldaBeta: Double,
- ldaOptimizer: String,
+ ldaOptimizerOption: String,
maxIterations: Int,
precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
@@ -82,14 +82,13 @@
wordDictionary,
sqlContext)
- val corpusSize = ldaCorpus.count()
-
docWordCountCache.unpersist()
// Instantiate optimizer based on input
val ldaOptimizer = ldaOptimizerOption match {
case "em" => new EMLDAOptimizer
case "online" => new OnlineLDAOptimizer().setOptimizeDocConcentration(true).setMiniBatchFraction({
+ val corpusSize = ldaCorpus.count()
if (corpusSize < 2) 0.75
else (0.05 + 1) / corpusSize
})
diff --git a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
index 2435561..a88a00a 100644
--- a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
@@ -88,7 +88,7 @@
precisionUtility = FloatPointPrecisionUtility32)
- "dns supicious connects analysis" should "estimate correct probabilities in toy data with framelength anomaly using" +
+ "dns suspicious connects analysis" should "estimate correct probabilities in toy data with framelength anomaly using" +
" EMLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
@@ -120,7 +120,7 @@
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
- val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain)
+ val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
@@ -132,7 +132,7 @@
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
- "dns supicious connects analysis" should "estimate correct probabilities in toy data with subdomain length anomaly " +
+ it should "estimate correct probabilities in toy data with subdomain length anomaly " +
"using EMLDAOptimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
@@ -156,7 +156,7 @@
0)
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
- val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain)
+ val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
val anomalyScore = scoredData.
filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
first().
@@ -171,10 +171,8 @@
Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
}
- "dns supicious connects analysis" should "estimate correct probabilities in toy data with subdomain length anomaly " +
+ it should "estimate correct probabilities in toy data with subdomain length anomaly " +
"using OnlineLDAOptimizer" in {
- "dns suspicious connects analysis" should "estimate correct probabilities in toy data with subdomain length anomaly" in {
- it should "estimate correct probabilities in toy data with subdomain length anomaly" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
@@ -195,11 +193,11 @@
"0x00000001",
1,
0)
+
val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
- val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain)
- val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, testConfig, data)
- val scoredData = model.score(sparkContext, sqlContext, data, testConfig.userDomain, testConfig.precisionUtility)
+ val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
+
val anomalyScore = scoredData.
filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
first().
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
index e701bb9..5fb9fb3 100644
--- a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
@@ -32,20 +32,22 @@
class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
- val ldaAlpha = 1.02
- val ldaBeta = 1.001
- val ldaMaxIterations = 20
-
- "SparkLDA" should "handle an extremely unbalanced two word doc" in {
+ "SparkLDA" should "handle an extremely unbalanced two word doc with EM optimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
+ val ldaAlpha = 1.02
+ val ldaBeta = 1.001
+ val ldaMaxIterations = 20
+
+ val optimizer = "em"
+
val catFancy = SpotLDAInput("pets", "cat", 1)
val dogWorld = SpotLDAInput("pets", "dog", 999)
val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- ldaMaxIterations, FloatPointPrecisionUtility64)
+ optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
val topicMixDF = out.docToTopicMix
@@ -59,16 +61,116 @@
Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
}
- it should "handle an extremely unbalanced two word doc with doc probabilities as Float" in {
+ it should "handle distinct docs on distinct words with EM optimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
+ val ldaAlpha = 1.2
+ val ldaBeta = 1.001
+ val ldaMaxIterations = 20
+
+ val optimizer = "em"
+
+ val catFancy = SpotLDAInput("cat fancy", "cat", 1)
+ val dogWorld = SpotLDAInput("dog world", "dog", 1)
+
+ val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
+ val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
+
+ val topicMixDF = out.docToTopicMix
+ val dogTopicMix: Array[Double] =
+ topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
+ .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+ val catTopicMix: Array[Double] =
+ topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
+ .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+ val catTopics = out.wordResults("cat")
+ val dogTopics = out.wordResults("dog")
+
+ Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
+ Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
+ }
+
+ it should "handle an extremely unbalanced two word doc with Online optimizer" in {
+ val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+ logger.setLevel(Level.WARN)
+
+ val ldaAlpha = 0.0009
+ val ldaBeta = 0.00001
+ val ldaMaxIterations = 400
+
+ val optimizer = "online"
+
val catFancy = SpotLDAInput("pets", "cat", 1)
val dogWorld = SpotLDAInput("pets", "dog", 999)
val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- ldaMaxIterations, FloatPointPrecisionUtility32)
+ optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
+
+ val topicMixDF = out.docToTopicMix
+
+ val topicMix =
+ topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+ .asInstanceOf[Seq[Double]].toArray
+ val catTopics = out.wordResults("cat")
+ val dogTopics = out.wordResults("dog")
+
+ Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
+ Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
+ }
+
+ it should "handle distinct docs on distinct words with Online optimizer" in {
+ val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+ logger.setLevel(Level.WARN)
+
+ val ldaAlpha = 0.0009
+ val ldaBeta = 0.00001
+ val ldaMaxIterations = 400
+ val optimizer = "online"
+
+ val catFancy = SpotLDAInput("cat fancy", "cat", 1)
+ val dogWorld = SpotLDAInput("dog world", "dog", 1)
+
+ val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
+ val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
+
+ val topicMixDF = out.docToTopicMix
+ val dogTopicMix: Array[Double] =
+ topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
+ .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+ val catTopicMix: Array[Double] =
+ topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
+ .toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+ val catTopics = out.wordResults("cat")
+ val dogTopics = out.wordResults("dog")
+
+ Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
+ Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
+ }
+
+ it should "handle an extremely unbalanced two word doc with doc probabilities as Float" in {
+ val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+ logger.setLevel(Level.WARN)
+
+ val ldaAlpha = 1.02
+ val ldaBeta = 1.001
+ val ldaMaxIterations = 20
+
+ val optimizer = "em"
+
+ val catFancy = SpotLDAInput("pets", "cat", 1)
+ val dogWorld = SpotLDAInput("pets", "dog", 999)
+
+ val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
+ val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
val topicMixDF = out.docToTopicMix
@@ -82,41 +184,22 @@
Math.abs(0.999 - (topicMix(0).toDouble * dogTopics(0) + topicMix(1).toDouble * dogTopics(1))) should be < 0.01
}
- it should "handle distinct docs on distinct words" in {
- val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
- logger.setLevel(Level.WARN)
- val catFancy = SpotLDAInput("cat fancy", "cat", 1)
- val dogWorld = SpotLDAInput("dog world", "dog", 1)
-
- val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
- val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- ldaMaxIterations, FloatPointPrecisionUtility64)
-
- val topicMixDF = out.docToTopicMix
- val dogTopicMix: Array[Double] =
- topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq(0)
- .asInstanceOf[Seq[Double]].toArray
-
- val catTopicMix: Array[Double] =
- topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq(0)
- .asInstanceOf[Seq[Double]].toArray
-
- val catTopics = out.wordResults("cat")
- val dogTopics = out.wordResults("dog")
-
- Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
- Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
- }
-
it should "handle distinct docs on distinct words with doc probabilities as Float" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
+
+ val ldaAlpha = 1.02
+ val ldaBeta = 1.001
+ val ldaMaxIterations = 20
+
+ val optimizer = "em"
+
val catFancy = SpotLDAInput("cat fancy", "cat", 1)
val dogWorld = SpotLDAInput("dog world", "dog", 1)
val data = sparkContext.parallelize(Seq(catFancy, dogWorld))
val out = SpotLDAWrapper.runLDA(sparkContext, sqlContext, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- ldaMaxIterations, FloatPointPrecisionUtility32)
+ optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
val topicMixDF = out.docToTopicMix
val dogTopicMix: Array[Float] =
diff --git a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
index 397d938..e0bdf83 100644
--- a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
@@ -94,7 +94,7 @@
val model =
FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
- val scoredData = model.score(sparkContext, sqlContext, data, testConfig.precisionUtility)
+ val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(Hour) === 13).collect().map(_.getAs[Double](Score))
@@ -128,7 +128,7 @@
val model =
FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
- val scoredData = model.score(sparkContext, sqlContext, data)
+ val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(Hour) === 13).collect().map(_.getAs[Double](Score))
diff --git a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
index f7b2877..274804d 100644
--- a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
@@ -108,7 +108,7 @@
val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfigProxy, data)
- val scoredData = model.score(sparkContext, data)
+ val scoredData = model.score(sparkContext, data, emTestConfigProxy.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(Host) === "intel.com").first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(Host) === "maw.bronto.com").collect().map(_.getAs[Double](Score))
@@ -152,7 +152,7 @@
val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfigProxy, data)
- val scoredData = model.score(sparkContext, data, testConfigProxy.precisionUtility)
+ val scoredData = model.score(sparkContext, data, onlineTestConfigProxy.precisionUtility)
val anomalyScore = scoredData.filter(scoredData(Host) === "intel.com").first().getAs[Double](Score)
val typicalScores = scoredData.filter(scoredData(Host) === "maw.bronto.com").collect().map(_.getAs[Double](Score))