(SPOT-286) Do not call org.apache.spark.sql.functions.broadcast() on large sets

https://issues.apache.org/jira/projects/SPOT/issues/SPOT-286
identifies an issue with calling org.apache.spark.sql.functions.broadcast()
on large datasets which may exceed a limit and cause an exception.

It was determined wrapping a dataset in this way is a performance
optimization, and the justification for this optimization is not clear.

It was decided it was preferable to undo the optimization than allow
it to fail for some situations.
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index 7245acf..92734d0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -100,7 +100,7 @@
         documentTopicMix))
 
     inDF
-      .join(org.apache.spark.sql.functions.broadcast(ipToTopicMix), inDF(ClientIP) === ipToTopicMix(DocumentName),
+      .join(ipToTopicMix, inDF(ClientIP) === ipToTopicMix(DocumentName),
         "left_outer")
       .selectExpr(inDF.schema.fieldNames :+ TopicProbabilityMix: _*)
       .withColumn(Score, scoringUDF(DNSSuspiciousConnectsModel.modelColumns :+ col(TopicProbabilityMix): _*))
@@ -213,4 +213,4 @@
 
   case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
 
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 4e09616..6637c87 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -64,13 +64,13 @@
       */
     val dataWithSrcTopicMix = {
 
-      val recordsWithSrcIPTopicMixes = flowRecords.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+      val recordsWithSrcIPTopicMixes = flowRecords.join(ipToTopicMix,
         flowRecords(SourceIP) === ipToTopicMix(DocumentName), "left_outer")
       val schemaWithSrcTopicMix = flowRecords.schema.fieldNames :+ TopicProbabilityMix
       val dataWithSrcIpProb: DataFrame = recordsWithSrcIPTopicMixes.selectExpr(schemaWithSrcTopicMix: _*)
         .withColumnRenamed(TopicProbabilityMix, SrcIpTopicMix)
 
-      val recordsWithIPTopicMixes = dataWithSrcIpProb.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+      val recordsWithIPTopicMixes = dataWithSrcIpProb.join(ipToTopicMix,
         dataWithSrcIpProb(DestinationIP) === ipToTopicMix(DocumentName), "left_outer")
       val schema = dataWithSrcIpProb.schema.fieldNames :+ TopicProbabilityMix
       recordsWithIPTopicMixes.selectExpr(schema: _*).withColumnRenamed(TopicProbabilityMix, DstIpTopicMix)
@@ -186,4 +186,4 @@
 
     new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
   }
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 7332fe4..d260ca0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -79,7 +79,7 @@
       scoreFunction.score(precisionUtility)(documentTopicMix, word))
 
     wordedDataFrame
-      .join(org.apache.spark.sql.functions.broadcast(ipToTopicMIx), dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
+      .join(ipToTopicMIx, dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
       .selectExpr(wordedDataFrame.schema.fieldNames :+ TopicProbabilityMix: _*)
       .withColumn(Score, udfScoreFunction(col(TopicProbabilityMix), col(Word)))
       .drop(TopicProbabilityMix)
@@ -213,4 +213,4 @@
       .map({ case Row(ip, word) => ((ip.asInstanceOf[String], word.asInstanceOf[String]), 1) })
       .reduceByKey(_ + _).map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
   }
-}
\ No newline at end of file
+}