Merge pull request #164 from jfn6030217/spot-286-remove-optimization
(SPOT-286) Do not call org.apache.spark.sql.functions.broadcast() on …
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index 7245acf..92734d0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -100,7 +100,7 @@
documentTopicMix))
inDF
- .join(org.apache.spark.sql.functions.broadcast(ipToTopicMix), inDF(ClientIP) === ipToTopicMix(DocumentName),
+ .join(ipToTopicMix, inDF(ClientIP) === ipToTopicMix(DocumentName),
"left_outer")
.selectExpr(inDF.schema.fieldNames :+ TopicProbabilityMix: _*)
.withColumn(Score, scoringUDF(DNSSuspiciousConnectsModel.modelColumns :+ col(TopicProbabilityMix): _*))
@@ -213,4 +213,4 @@
case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 4e09616..6637c87 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -64,13 +64,13 @@
*/
val dataWithSrcTopicMix = {
- val recordsWithSrcIPTopicMixes = flowRecords.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+ val recordsWithSrcIPTopicMixes = flowRecords.join(ipToTopicMix,
flowRecords(SourceIP) === ipToTopicMix(DocumentName), "left_outer")
val schemaWithSrcTopicMix = flowRecords.schema.fieldNames :+ TopicProbabilityMix
val dataWithSrcIpProb: DataFrame = recordsWithSrcIPTopicMixes.selectExpr(schemaWithSrcTopicMix: _*)
.withColumnRenamed(TopicProbabilityMix, SrcIpTopicMix)
- val recordsWithIPTopicMixes = dataWithSrcIpProb.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+ val recordsWithIPTopicMixes = dataWithSrcIpProb.join(ipToTopicMix,
dataWithSrcIpProb(DestinationIP) === ipToTopicMix(DocumentName), "left_outer")
val schema = dataWithSrcIpProb.schema.fieldNames :+ TopicProbabilityMix
recordsWithIPTopicMixes.selectExpr(schema: _*).withColumnRenamed(TopicProbabilityMix, DstIpTopicMix)
@@ -186,4 +186,4 @@
new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
}
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 7332fe4..d260ca0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -79,7 +79,7 @@
scoreFunction.score(precisionUtility)(documentTopicMix, word))
wordedDataFrame
- .join(org.apache.spark.sql.functions.broadcast(ipToTopicMIx), dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
+ .join(ipToTopicMIx, dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
.selectExpr(wordedDataFrame.schema.fieldNames :+ TopicProbabilityMix: _*)
.withColumn(Score, udfScoreFunction(col(TopicProbabilityMix), col(Word)))
.drop(TopicProbabilityMix)
@@ -213,4 +213,4 @@
.map({ case Row(ip, word) => ((ip.asInstanceOf[String], word.asInstanceOf[String]), 1) })
.reduceByKey(_ + _).map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
}
-}
\ No newline at end of file
+}