diff --git a/spot-ingest/common/kafka_topic.sh b/spot-ingest/common/kafka_topic.sh
index ab95495..4c078c9 100755
--- a/spot-ingest/common/kafka_topic.sh
+++ b/spot-ingest/common/kafka_topic.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py
index 9cd91ea..6f6ff7c 100755
--- a/spot-ingest/master_collector.py
+++ b/spot-ingest/master_collector.py
@@ -21,70 +21,76 @@
 import os
 import json
 import sys
+import datetime
 from common.utils import Util
 from common.kerberos import Kerberos
 from common.kafka_client import KafkaTopic
-import datetime 
+
 
 # get master configuration.
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-master_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+MASTER_CONF = json.loads(open(CONF_FILE).read())
 
 def main():
 
     # input Parameters
     parser = argparse.ArgumentParser(description="Master Collector Ingest Daemon")
-    parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
-    parser.add_argument('-w','--workers',dest='workers_num',required=True,help='Number of workers for the ingest process',metavar='')
-    parser.add_argument('-id','--ingestId',dest='ingest_id',required=False,help='Ingest ID',metavar='')
+    parser.add_argument('-t', '--type', dest='type', required=True,
+                        help='Type of data that will be ingested (Pipeline Configuration)',
+                        metavar='')
+    parser.add_argument('-w', '--workers', dest='workers_num',
+                        required=True, help='Number of workers for the ingest process',
+                        metavar='')
+    parser.add_argument('-id', '--ingestId', dest='ingest_id',
+                        required=False, help='Ingest ID', metavar='')
     args = parser.parse_args()
 
     # start collector based on data source type.
-    start_collector(args.type,args.workers_num,args.ingest_id)
+    start_collector(args.type, args.workers_num, args.ingest_id)
 
-def start_collector(type,workers_num,id=None):
+def start_collector(type, workers_num, id=None):
 
     # generate ingest id
-    ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":","_").replace(".","_")
-    
+    ingest_id = str(datetime.datetime.time(datetime.datetime.now())).replace(":", "_").replace(".", "_")
+
     # create logger.
     logger = Util.get_logger("SPOT.INGEST")
 
     # validate the given configuration exists in ingest_conf.json.
-    if not type in master_conf["pipelines"]:
-        logger.error("'{0}' type is not a valid configuration.".format(type));
+    if not type in MASTER_CONF["pipelines"]:
+        logger.error("'{0}' type is not a valid configuration.".format(type))
         sys.exit(1)
 
     # validate the type is a valid module.
-    if not Util.validate_data_source(master_conf["pipelines"][type]["type"]):
-        logger.error("'{0}' type is not configured. Please check you ingest conf file".format(master_conf["pipelines"][type]["type"]));
+    if not Util.validate_data_source(MASTER_CONF["pipelines"][type]["type"]):
+        logger.error("'{0}' type is not configured. Please check you ingest conf file".format(MASTER_CONF["pipelines"][type]["type"]))
         sys.exit(1)
-    
+
     # validate if kerberos authentication is required.
     if os.getenv('KRB_AUTH'):
         kb = Kerberos()
         kb.authenticate()
-    
+
     # kafka server info.
     logger.info("Initializing kafka instance")
-    k_server = master_conf["kafka"]['kafka_server']
-    k_port = master_conf["kafka"]['kafka_port']
+    k_server = MASTER_CONF["kafka"]['kafka_server']
+    k_port = MASTER_CONF["kafka"]['kafka_port']
 
     # required zookeeper info.
-    zk_server = master_conf["kafka"]['zookeper_server']
-    zk_port = master_conf["kafka"]['zookeper_port']
-         
-    topic = "SPOT-INGEST-{0}_{1}".format(type,ingest_id) if not id else id
-    kafka = KafkaTopic(topic,k_server,k_port,zk_server,zk_port,workers_num)
+    zk_server = MASTER_CONF["kafka"]['zookeper_server']
+    zk_port = MASTER_CONF["kafka"]['zookeper_port']
+
+    topic = "SPOT-INGEST-{0}_{1}".format(type, ingest_id) if not id else id
+    kafka = KafkaTopic(topic, k_server, k_port, zk_server, zk_port, workers_num)
 
     # create a collector instance based on data source type.
     logger.info("Starting {0} ingest instance".format(topic))
-    module = __import__("pipelines.{0}.collector".format(master_conf["pipelines"][type]["type"]),fromlist=['Collector'])
+    module = __import__("pipelines.{0}.collector".format(MASTER_CONF["pipelines"][type]["type"]), fromlist=['Collector'])
 
     # start collector.
-    ingest_collector = module.Collector(master_conf['hdfs_app_path'],kafka,type)
+    ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], kafka, type)
     ingest_collector.start()
 
-if __name__=='__main__':
+if __name__ == '__main__':
     main()
diff --git a/spot-ingest/start_ingest_standalone.sh b/spot-ingest/start_ingest_standalone.sh
index 0e3bfd5..1a16612 100755
--- a/spot-ingest/start_ingest_standalone.sh
+++ b/spot-ingest/start_ingest_standalone.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py
index db51def..5c29148 100755
--- a/spot-ingest/worker.py
+++ b/spot-ingest/worker.py
@@ -20,42 +20,48 @@
 import argparse
 import os
 import json
-import logging
 import sys
 from common.utils import Util
 from common.kerberos import Kerberos
 from common.kafka_client import KafkaConsumer
 
-script_path = os.path.dirname(os.path.abspath(__file__))
-conf_file = "{0}/ingest_conf.json".format(script_path)
-worker_conf = json.loads(open (conf_file).read())
+SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
+CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
+WORKER_CONF = json.loads(open(CONF_FILE).read())
 
 def main():
 
     # input parameters
     parser = argparse.ArgumentParser(description="Worker Ingest Framework")
-    parser.add_argument('-t','--type',dest='type',required=True,help='Type of data that will be ingested (Pipeline Configuration)',metavar='')
-    parser.add_argument('-i','--id',dest='id',required=True,help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',metavar='')
-    parser.add_argument('-top','--topic',dest='topic',required=True,help='Topic to read from.',metavar="")
-    parser.add_argument('-p','--processingParallelism',dest='processes',required=False,help='Processing Parallelism',metavar="")
+    parser.add_argument('-t', '--type', dest='type', required=True,
+                        help='Type of data that will be ingested (Pipeline Configuration)',
+                        metavar='')
+    parser.add_argument('-i', '--id', dest='id', required=True,
+                        help='Worker Id, this is needed to sync Kafka and Ingest framework (Partition Number)',
+                        metavar='')
+    parser.add_argument('-top', '--topic', dest='topic', required=True,
+                        help='Topic to read from.', metavar="")
+    parser.add_argument('-p', '--processingParallelism', dest='processes',
+                        required=False, help='Processing Parallelism', metavar="")
     args = parser.parse_args()
 
     # start worker based on the type.
-    start_worker(args.type,args.topic,args.id,args.processes)
+    start_worker(args.type, args.topic, args.id, args.processes)
 
 
-def start_worker(type,topic,id,processes=None):
+def start_worker(type, topic, id, processes=None):
 
     logger = Util.get_logger("SPOT.INGEST.WORKER")
 
     # validate the given configuration exists in ingest_conf.json.
-    if not type in worker_conf["pipelines"]:
-        logger.error("'{0}' type is not a valid configuration.".format(type));
+    if not type in WORKER_CONF["pipelines"]:
+        logger.error("'{0}' type is not a valid configuration.".format(type))
         sys.exit(1)
 
     # validate the type is a valid module.
-    if not Util.validate_data_source(worker_conf["pipelines"][type]["type"]):
-        logger.error("The provided data source {0} is not valid".format(type));sys.exit(1)
+    if not Util.validate_data_source(WORKER_CONF["pipelines"][type]["type"]):
+        logger.error("The provided data source {0} is not valid".format(type))
+        sys.exit(1)
 
     # validate if kerberos authentication is requiered.
     if os.getenv('KRB_AUTH'):
@@ -63,27 +69,27 @@
         kb.authenticate()
 
     # create a worker instance based on the data source type.
-    module = __import__("pipelines.{0}.worker".format(worker_conf["pipelines"][type]["type"]),fromlist=['Worker'])
+    module = __import__("pipelines.{0}.worker".format(WORKER_CONF["pipelines"][type]["type"]),
+                        fromlist=['Worker'])
 
     # kafka server info.
     logger.info("Initializing kafka instance")
-    k_server = worker_conf["kafka"]['kafka_server']
-    k_port = worker_conf["kafka"]['kafka_port']
+    k_server = WORKER_CONF["kafka"]['kafka_server']
+    k_port = WORKER_CONF["kafka"]['kafka_port']
 
     # required zookeeper info.
-    zk_server = worker_conf["kafka"]['zookeper_server']
-    zk_port = worker_conf["kafka"]['zookeper_port']
+    zk_server = WORKER_CONF["kafka"]['zookeper_server']
+    zk_port = WORKER_CONF["kafka"]['zookeper_port']
     topic = topic
 
     # create kafka consumer.
-    kafka_consumer = KafkaConsumer(topic,k_server,k_port,zk_server,zk_port,id)
+    kafka_consumer = KafkaConsumer(topic, k_server, k_port, zk_server, zk_port, id)
 
     # start worker.
-    db_name = worker_conf['dbname']
-    app_path = worker_conf['hdfs_app_path']
-    ingest_worker = module.Worker(db_name,app_path,kafka_consumer,type,processes)
+    db_name = WORKER_CONF['dbname']
+    app_path = WORKER_CONF['hdfs_app_path']
+    ingest_worker = module.Worker(db_name, app_path, kafka_consumer, type, processes)
     ingest_worker.start()
 
-if __name__=='__main__':
+if __name__ == '__main__':
     main()
-
diff --git a/spot-ml/ml_ops.sh b/spot-ml/ml_ops.sh
index dd00bbc..abe3d06 100755
--- a/spot-ml/ml_ops.sh
+++ b/spot-ml/ml_ops.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
diff --git a/spot-ml/ml_test.sh b/spot-ml/ml_test.sh
index 3036c93..7a4971a 100755
--- a/spot-ml/ml_test.sh
+++ b/spot-ml/ml_test.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
@@ -80,4 +80,4 @@
   --ldabeta ${LDA_BETA} \
   --ldaoptimizer ${LDA_OPTIMIZER} \
   --precision ${PRECISION} \
-  $USER_DOMAIN_CMD
\ No newline at end of file
+  $USER_DOMAIN_CMD
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index dfcb543..7245acf 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -19,22 +19,18 @@
 
 import org.apache.log4j.Logger
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.DNSWordCreation
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper._
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda._
 import org.apache.spot.utilities.DomainProcessor.DomainInfo
 import org.apache.spot.utilities._
 import org.apache.spot.utilities.data.validation.InvalidDataHandler
 
-import scala.util.{Failure, Success, Try}
-
 
 /**
   * A probabilistic model of the DNS queries issued by each client IP.
@@ -50,17 +46,17 @@
   *
   * Create these models using the  factory in the companion object.
   *
-  * @param inTopicCount          Number of topics to use in the topic model.
-  * @param inIpToTopicMix        Per-IP topic mix.
-  * @param inWordToPerTopicProb  Per-word,  an array of probability of word given topic per topic.
+  * @param inTopicCount         Number of topics to use in the topic model.
+  * @param inIpToTopicMix       Per-IP topic mix.
+  * @param inWordToPerTopicProb Per-word,  an array of probability of word given topic per topic.
   */
 class DNSSuspiciousConnectsModel(inTopicCount: Int,
                                  inIpToTopicMix: DataFrame,
                                  inWordToPerTopicProb: Map[String, Array[Double]]) {
 
-  val topicCount = inTopicCount
-  val ipToTopicMix = inIpToTopicMix
-  val wordToPerTopicProb = inWordToPerTopicProb
+  val topicCount: Int = inTopicCount
+  val ipToTopicMix: DataFrame = inIpToTopicMix
+  val wordToPerTopicProb: Map[String, Array[Double]] = inWordToPerTopicProb
 
   /**
     * Use a suspicious connects model to assign estimated probabilities to a dataframe of
@@ -128,7 +124,7 @@
     QueryTypeField,
     QueryResponseCodeField))
 
-  val modelColumns = ModelSchema.fieldNames.toList.map(col)
+  val modelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
 
   val DomainStatsSchema = StructType(List(TopDomainField, SubdomainLengthField, SubdomainEntropyField, NumPeriodsField))
 
@@ -136,7 +132,7 @@
     * Create a new DNS Suspicious Connects model by training it on a data frame and a feedback file.
     *
     * @param sparkSession Spark Session
-    * @param logger
+    * @param logger       Application logger
     * @param config       Analysis configuration object containing CLI parameters.
     *                     Contains the path to the feedback file in config.scoresFile
     * @param inputRecords Data used to train the model.
@@ -155,7 +151,6 @@
       config.feedbackFile,
       config.duplicationFactor))
 
-    val countryCodesBC = sparkSession.sparkContext.broadcast(CountryCodes.CountryCodes)
     val topDomainsBC = sparkSession.sparkContext.broadcast(TopDomains.TopDomains)
     val userDomain = config.userDomain
 
@@ -175,19 +170,20 @@
         .reduceByKey(_ + _)
         .map({ case ((ipDst, word), count) => SpotLDAInput(ipDst, word, count) })
 
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipDstWordCounts, config.precisionUtility, sparkSession)
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
-      ipDstWordCounts,
-      config.topicCount,
+    val model: SpotLDAModel = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
 
-    new DNSSuspiciousConnectsModel(config.topicCount, ipToTopicMix, wordToPerTopicProb)
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
+
+    new DNSSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
 
   }
 
@@ -205,15 +201,16 @@
                        userDomain: String,
                        url: String): TempFields = {
 
-    val DomainInfo(_, topDomainClass, subdomain, subdomainLength, subdomainEntropy, numPeriods) =
+    val DomainInfo(_, topDomainClass, _, subDomainLength, subDomainEntropy, numPeriods) =
       DomainProcessor.extractDomainInfo(url, topDomainsBC, userDomain)
 
 
     TempFields(topDomainClass = topDomainClass,
-      subdomainLength = subdomainLength,
-      subdomainEntropy = subdomainEntropy,
+      subdomainLength = subDomainLength,
+      subdomainEntropy = subDomainEntropy,
       numPeriods = numPeriods)
   }
 
   case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
+
 }
\ No newline at end of file
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
new file mode 100644
index 0000000..e9f0b66
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.utilities.{FloatPointPrecisionUtility, FloatPointPrecisionUtility64}
+
+import scala.collection.immutable.Map
+
+/**
+  * Apache Spot routines to format Spark LDA input and output for scoring.
+  */
+class SpotLDAHelper(private final val sparkSession: SparkSession,
+                    final val docWordCount: RDD[SpotLDAInput],
+                    private final val documentDictionary: DataFrame,
+                    private final val wordDictionary: Map[String, Int],
+                    private final val precisionUtility: FloatPointPrecisionUtility = FloatPointPrecisionUtility64)
+  extends Serializable {
+
+  /**
+    * Format document word count as RDD[(Long, Vector)] - input data for LDA algorithm
+    *
+    * @return RDD[(Long, Vector)]
+    */
+  val formattedCorpus: RDD[(Long, Vector)] = {
+    import sparkSession.implicits._
+
+    val getWordId = {
+      udf((word: String) => wordDictionary(word))
+    }
+
+    val docWordCountDF = docWordCount
+      .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
+      .toDF(DocumentName, WordName, WordNameWordCount)
+
+    // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
+    // is indexed by DocID
+    val wordCountsPerDocDF = docWordCountDF
+      .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
+      .drop(documentDictionary(DocumentName))
+      .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
+      .drop(WordName)
+
+    val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
+    = wordCountsPerDocDF
+      .select(DocumentNumber, WordNumber, WordNameWordCount)
+      .rdd
+      .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
+      .groupByKey
+
+    // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
+    val numUniqueWords = wordDictionary.size
+    val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
+      .mapValues(vs => Vectors.sparse(numUniqueWords, vs.toSeq))
+
+    ldaInput
+  }
+
+  /**
+    * Format LDA output topicDistribution for spot-ml scoring
+    *
+    * @param documentDistributions LDA model topicDistributions
+    * @return DataFrame
+    */
+  def formatDocumentDistribution(documentDistributions: RDD[(Long, Vector)]): DataFrame = {
+    import sparkSession.implicits._
+
+    val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
+    val documentToTopicDistributionDF = documentDistributions.toDF(DocumentNumber, TopicProbabilityMix)
+
+    val documentToTopicDistributionArray = documentToTopicDistributionDF
+      .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
+      .drop(documentDictionary(DocumentNumber))
+      .drop(documentToTopicDistributionDF(DocumentNumber))
+      .select(DocumentName, TopicProbabilityMix)
+      .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
+      .selectExpr(s"$DocumentName  AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
+
+    precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
+  }
+
+  /**
+    * Format LDA output topicMatrix for spot-ml scoring
+    *
+    * @param topicsMatrix LDA model topicMatrix
+    * @return Map[String, Array[Double]]
+    **/
+  def formatTopicDistributions(topicsMatrix: Matrix): Map[String, Array[Double]] = {
+    // Incoming word top matrix is in column-major order and the columns are unnormalized
+    val m = topicsMatrix.numRows
+    val n = topicsMatrix.numCols
+    val reverseWordDictionary = wordDictionary.map(_.swap)
+
+    val columnSums: Array[Double] = Range(0, n).map(j => Range(0, m).map(i => topicsMatrix(i, j)).sum).toArray
+
+    val wordProbabilities: Seq[Array[Double]] = topicsMatrix.transpose.toArray.grouped(n).toSeq
+      .map(unNormalizedProbabilities => unNormalizedProbabilities.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
+
+    wordProbabilities.zipWithIndex
+      .map({ case (topicProbabilities, wordInd) => (reverseWordDictionary(wordInd), topicProbabilities) }).toMap
+  }
+
+}
+
+object SpotLDAHelper {
+
+  /**
+    * Factory method for SpotLDAHelper new instance.
+    *
+    * @param docWordCount Document word count.
+    * @param precisionUtility
+    * @param sparkSession
+    * @return
+    */
+  def apply(docWordCount: RDD[SpotLDAInput],
+            precisionUtility: FloatPointPrecisionUtility,
+            sparkSession: SparkSession): SpotLDAHelper = {
+
+    import sparkSession.implicits._
+
+    val docWordCountCache = docWordCount.cache()
+
+    // Forcing an action to cache results.
+    docWordCountCache.count()
+
+    // Create word Map Word,Index for further usage
+    val wordDictionary: Map[String, Int] = {
+      val words = docWordCountCache
+        .map({ case SpotLDAInput(_, word, _) => word })
+        .distinct
+        .collect
+      words.zipWithIndex.toMap
+    }
+
+    val documentDictionary: DataFrame = docWordCountCache
+      .map({ case SpotLDAInput(doc, _, _) => doc })
+      .distinct
+      .zipWithIndex
+      .toDF(DocumentName, DocumentNumber)
+      .cache
+
+    new SpotLDAHelper(sparkSession, docWordCount, documentDictionary, wordDictionary, precisionUtility)
+  }
+
+}
+
+/**
+  * Spot LDA input case class
+  *
+  * @param doc   Document name.
+  * @param word  Word.
+  * @param count Times the word appears for the document.
+  */
+case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
new file mode 100644
index 0000000..181dc62
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDAModel, LocalLDAModel}
+import org.apache.spark.sql.SparkSession
+
+/**
+  * Spot LDAModel.
+  */
+sealed trait SpotLDAModel {
+
+  /**
+    * Save the model to HDFS
+    *
+    * @param sparkSession
+    * @param location
+    */
+  def save(sparkSession: SparkSession, location: String): Unit
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring
+    *
+    * @param helper
+    * @return
+    */
+  def predict(helper: SpotLDAHelper): SpotLDAResult
+}
+
+/**
+  * Spark LocalLDAModel wrapper.
+  *
+  * @param ldaModel Spark LDA Model
+  */
+class SpotLocalLDAModel(final val ldaModel: LDAModel) extends SpotLDAModel {
+
+  /**
+    * Save LocalLDAModel on HDFS location
+    *
+    * @param sparkSession the Spark session
+    * @param location     the HDFS location
+    */
+  override def save(sparkSession: SparkSession, location: String): Unit = {
+    val sparkContext = sparkSession.sparkContext
+
+    ldaModel.save(sparkContext, location)
+  }
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+    * SpotLocalLDAModel.predict will use corpus from spotLDAHelper which can be a new set of documents or the same
+    * documents used for training.
+    *
+    * @param spotLDAHelper Spot LDA Helper object, can be the same used for training or a new instance with new
+    *                      documents.
+    * @return SpotLDAResult
+    */
+  override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+    val localLDAModel: LocalLDAModel = ldaModel.asInstanceOf[LocalLDAModel]
+
+    val topicDistributions = localLDAModel.topicDistributions(spotLDAHelper.formattedCorpus)
+    val topicMix = localLDAModel.topicsMatrix
+
+    SpotLDAResult(spotLDAHelper, topicDistributions, topicMix)
+  }
+}
+
+/** Spark DistributedLDAModel wrapper.
+  * Ideally, this model should be used only for batch processing.
+  *
+  * @param ldaModel Spark LDA Model
+  */
+class SpotDistributedLDAModel(final val ldaModel: LDAModel) extends
+  SpotLDAModel {
+
+  /**
+    * Save DistributedLDAModel on HDFS location
+    *
+    * @param sparkSession the Spark session
+    * @param location     the HDFS location
+    */
+  override def save(sparkSession: SparkSession, location: String): Unit = {
+    val sparkContext = sparkSession.sparkContext
+
+    ldaModel.save(sparkContext, location)
+  }
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+    * SpotDistributeLDAModel.predict will use same documents that were used for training, can't predict on new
+    * documents. When passing spotLDAHelper we recommend to make sure it's the same object it was passed for training.
+    *
+    * @param spotLDAHelper Spot LDA Helper object used for training
+    * @return SpotLDAResult
+    */
+  override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+    val distributedLDAModel: DistributedLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
+
+    val topicDistributions = distributedLDAModel.topicDistributions
+    val topicsMatrix = distributedLDAModel.topicsMatrix
+
+    SpotLDAResult(spotLDAHelper, topicDistributions, topicsMatrix)
+  }
+}
+
+object SpotLDAModel {
+
+  /**
+    * Factory method, based on instance of ldaModel will generate an object based on DistributedLDAModel
+    * implementation or LocalLDAModel.
+    *
+    * @param ldaModel Spark LDAModel
+    * @return
+    */
+  def apply(ldaModel: LDAModel): SpotLDAModel = {
+
+    ldaModel match {
+      case model: DistributedLDAModel => new SpotDistributedLDAModel(model)
+      case model: LocalLDAModel => new SpotLocalLDAModel(model)
+    }
+  }
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
new file mode 100644
index 0000000..a91cee2
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+/**
+  * LDA results formatted for Apache Spot scoring.
+  *
+  */
+class SpotLDAResult(private final val helper: SpotLDAHelper,
+                    final val topicDistributions: RDD[(Long, Vector)],
+                    final val documentToTopicMix: DataFrame,
+                    final val topicsMix: Matrix,
+                    final val wordToTopicMix: Map[String, Array[Double]])
+
+object SpotLDAResult {
+
+  def apply(helper: SpotLDAHelper, topicDistributions: RDD[(Long, Vector)], topicsMix: Matrix): SpotLDAResult = {
+
+    val documentToTopicMix: DataFrame = helper.formatDocumentDistribution(topicDistributions)
+    val wordToTopicMix: Map[String, Array[Double]] = helper.formatTopicDistributions(topicsMix)
+
+    new SpotLDAResult(helper, topicDistributions, documentToTopicMix, topicsMix, wordToTopicMix)
+  }
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
index 122e8ed..7a8b67e 100644
--- a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
@@ -18,19 +18,15 @@
 package org.apache.spot.lda
 
 import org.apache.log4j.Logger
-import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.clustering.{LDAModel, _}
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spot.lda.SpotLDAWrapperSchema._
-import org.apache.spot.utilities.FloatPointPrecisionUtility
-
-import scala.collection.immutable.Map
+import org.apache.spark.sql.SparkSession
 
 /**
   * Spark LDA implementation
-  * Contains routines for LDA using Scala Spark implementation from mllib
+  * Contains routines for LDA using Scala Spark implementation from org.apache.spark.mllib.clustering
   * 1. Creates list of unique documents, words and model based on those two
   * 2. Processes the model using Spark LDA
   * 3. Reads Spark LDA results: Topic distributions per document (docTopicDist) and word distributions per topic (wordTopicMat)
@@ -42,8 +38,6 @@
   /**
     * Runs Spark LDA and returns a new model.
     *
-    * @param sparkSession       the SparkSession
-    * @param docWordCount       RDD with document list and the word count for each document (corpus)
     * @param topicCount         number of topics to find
     * @param logger             application logger
     * @param ldaSeed            LDA seed
@@ -51,51 +45,20 @@
     * @param ldaBeta            topic concentration
     * @param ldaOptimizerOption LDA optimizer, em or online
     * @param maxIterations      maximum number of iterations for the optimizer
-    * @param precisionUtility   FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
     * @return
     */
-  def runLDA(sparkSession: SparkSession,
-             docWordCount: RDD[SpotLDAInput],
-             topicCount: Int,
-             logger: Logger,
-             ldaSeed: Option[Long],
-             ldaAlpha: Double,
-             ldaBeta: Double,
-             ldaOptimizerOption: String,
-             maxIterations: Int,
-             precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
+  def run(topicCount: Int,
+          logger: Logger,
+          ldaSeed: Option[Long],
+          ldaAlpha: Double,
+          ldaBeta: Double,
+          ldaOptimizerOption: String,
+          maxIterations: Int,
+          helper: SpotLDAHelper): SpotLDAModel = {
 
-    import sparkSession.implicits._
-
-    val docWordCountCache = docWordCount.cache()
-
-    // Forcing an action to cache results.
-    docWordCountCache.count()
-
-    // Create word Map Word,Index for further usage
-    val wordDictionary: Map[String, Int] = {
-      val words = docWordCountCache
-        .map({ case SpotLDAInput(doc, word, count) => word })
-        .distinct
-        .collect
-      words.zipWithIndex.toMap
-    }
-
-    val documentDictionary: DataFrame = docWordCountCache
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex
-      .toDF(DocumentName, DocumentNumber)
-      .cache
 
     // Structure corpus so that the index is the docID, values are the vectors of word occurrences in that doc
-    val ldaCorpus: RDD[(Long, Vector)] =
-      formatSparkLDAInput(docWordCountCache,
-        documentDictionary,
-        wordDictionary,
-        sparkSession)
-
-    docWordCountCache.unpersist()
+    val ldaCorpus: RDD[(Long, Vector)] = helper.formattedCorpus
 
     // Instantiate optimizer based on input
     val ldaOptimizer = ldaOptimizerOption match {
@@ -121,162 +84,35 @@
         .setBeta(ldaBeta)
         .setOptimizer(ldaOptimizer)
 
-    // If caller does not provide seed to lda, ie. ldaSeed is empty, lda is seeded automatically set to hash value of class name
-
+    // If caller does not provide seed to lda, ie. ldaSeed is empty,
+    // lda is seeded automatically set to hash value of class name
     if (ldaSeed.nonEmpty) {
       lda.setSeed(ldaSeed.get)
     }
 
-    val (wordTopicMat, docTopicDist) = ldaOptimizer match {
-      case _: EMLDAOptimizer => {
-        val ldaModel = lda.run(ldaCorpus).asInstanceOf[DistributedLDAModel]
+    val model: LDAModel = lda.run(ldaCorpus)
 
-        // Get word topic mix, from Spark documentation:
-        // Inferred topics, where each topic is represented by a distribution over terms.
-        // This is a matrix of size vocabSize x k, where each column is a topic.
-        // No guarantees are given about the ordering of the topics.
-        val wordTopicMat: Matrix = ldaModel.topicsMatrix
+    SpotLDAModel(model)
+  }
 
-        // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
-        // i is the fraction of the document which belongs to topic i
-        val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions
+  /**
+    * Load an existing model from HDFS location.
+    *
+    * @param sparkSession       the Spark session.
+    * @param location           the HDFS location for the model.
+    * @param ldaOptimizerOption LDA optimizer, em or online.
+    * @return SpotLDAModel
+    */
+  def load(sparkSession: SparkSession, location: String, ldaOptimizerOption: String): SpotLDAModel = {
+    val sparkContext: SparkContext = sparkSession.sparkContext
 
-        (wordTopicMat, docTopicDist)
-
-      }
-
-      case _: OnlineLDAOptimizer => {
-        val ldaModel = lda.run(ldaCorpus).asInstanceOf[LocalLDAModel]
-
-        // Get word topic mix, from Spark documentation:
-        // Inferred topics, where each topic is represented by a distribution over terms.
-        // This is a matrix of size vocabSize x k, where each column is a topic.
-        // No guarantees are given about the ordering of the topics.
-        val wordTopicMat: Matrix = ldaModel.topicsMatrix
-
-        // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
-        // i is the fraction of the document which belongs to topic i
-        val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions(ldaCorpus)
-
-        (wordTopicMat, docTopicDist)
-
-      }
-
+    val model = ldaOptimizerOption match {
+      case "em" => DistributedLDAModel.load(sparkContext, location)
+      case "online" => LocalLDAModel.load(sparkContext, location)
+      case _ => throw new IllegalArgumentException(
+        s"Invalid LDA optimizer $ldaOptimizerOption")
     }
 
-    // Create doc results from vector: convert docID back to string, convert vector of probabilities to array
-    val docToTopicMixDF =
-      formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession, precisionUtility)
-
-    documentDictionary.unpersist()
-
-    // Create word results from matrix: convert matrix to sequence, wordIDs back to strings, sequence of
-    // probabilities to array
-    val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
-    val wordResults = formatSparkLDAWordOutput(wordTopicMat, revWordMap)
-
-    // Create output object
-    SpotLDAOutput(docToTopicMixDF, wordResults)
+    SpotLDAModel(model)
   }
-
-  /**
-    * Formats input data for LDA algorithm
-    *
-    * @param docWordCount       RDD with document list and the word count for each document (corpus)
-    * @param documentDictionary DataFrame with a distinct list of documents and its id
-    * @param wordDictionary     immutable Map with distinct list of word and its id
-    * @param sparkSession       the SparkSession
-    * @return
-    */
-  def formatSparkLDAInput(docWordCount: RDD[SpotLDAInput],
-                          documentDictionary: DataFrame,
-                          wordDictionary: Map[String, Int],
-                          sparkSession: SparkSession): RDD[(Long, Vector)] = {
-
-    import sparkSession.implicits._
-
-    val getWordId = {
-      udf((word: String) => (wordDictionary(word)))
-    }
-
-    val docWordCountDF = docWordCount
-      .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
-      .toDF(DocumentName, WordName, WordNameWordCount)
-
-    // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
-    // is indexed by DocID
-    val wordCountsPerDocDF = docWordCountDF
-      .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
-      .drop(documentDictionary(DocumentName))
-      .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
-      .drop(WordName)
-
-    val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
-    = wordCountsPerDocDF
-      .select(DocumentNumber, WordNumber, WordNameWordCount)
-      .rdd
-      .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
-      .groupByKey
-
-    // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
-    val numUniqueWords = wordDictionary.size
-    val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
-      .mapValues({ case vs => Vectors.sparse(numUniqueWords, vs.toSeq) })
-
-    ldaInput
-  }
-
-  /**
-    * Format LDA output topicMatrix for spot-ml scoring
-    *
-    * @param wordTopMat LDA model topicMatrix
-    * @param wordMap    immutable Map with distinct list of word and its id
-    * @return
-    */
-  def formatSparkLDAWordOutput(wordTopMat: Matrix, wordMap: Map[Int, String]): scala.Predef.Map[String, Array[Double]] = {
-
-    // incoming word top matrix is in column-major order and the columns are unnormalized
-    val m = wordTopMat.numRows
-    val n = wordTopMat.numCols
-    val columnSums: Array[Double] = Range(0, n).map(j => (Range(0, m).map(i => wordTopMat(i, j)).sum)).toArray
-
-    val wordProbs: Seq[Array[Double]] = wordTopMat.transpose.toArray.grouped(n).toSeq
-      .map(unnormProbs => unnormProbs.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
-
-    wordProbs.zipWithIndex.map({ case (topicProbs, wordInd) => (wordMap(wordInd), topicProbs) }).toMap
-  }
-
-  /**
-    * Format LDA output topicDistribution for spot-ml scoring
-    *
-    * @param docTopDist         LDA model topicDistribution
-    * @param documentDictionary DataFrame with a distinct list of documents and its id
-    * @param sparkSession       the SparkSession
-    * @param precisionUtility   FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
-    * @return
-    */
-  def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, sparkSession: SparkSession,
-                                   precisionUtility: FloatPointPrecisionUtility):
-  DataFrame = {
-    import sparkSession.implicits._
-
-    val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
-    val documentToTopicDistributionDF = docTopDist.toDF(DocumentNumber, TopicProbabilityMix)
-
-    val documentToTopicDistributionArray = documentToTopicDistributionDF
-      .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
-      .drop(documentDictionary(DocumentNumber))
-      .drop(documentToTopicDistributionDF(DocumentNumber))
-      .select(DocumentName, TopicProbabilityMix)
-      .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
-      .selectExpr(s"$DocumentName  AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
-
-    precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
-  }
-
-  case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
-
-  case class SpotLDAOutput(docToTopicMix: DataFrame, wordResults: Map[String, Array[Double]])
-
 }
\ No newline at end of file
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 6be11e1..4e09616 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -20,11 +20,10 @@
 import org.apache.log4j.Logger
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
 import org.apache.spot.netflow.FlowSchema._
 import org.apache.spot.netflow.FlowWordCreator
 import org.apache.spot.utilities.FloatPointPrecisionUtility
@@ -36,7 +35,7 @@
   * The model uses a topic-modelling approach that:
   * 1. Simplifies netflow records into words, one word at the source IP and another (possibly different) at the
   * destination IP.
-  * 2. The netflow words about each IP are treated as collections of thes words.
+  * 2. The netflow words about each IP are treated as collections of these words.
   * 3. A topic modelling approach is used to infer a collection of "topics" that represent common profiles
   * of network traffic. These "topics" are probability distributions on words.
   * 4. Each IP has a mix of topics corresponding to its behavior.
@@ -112,7 +111,7 @@
 }
 
 /**
-  * Contains dataframe schema information as well as the train-from-dataframe routine
+  * Contains DataFrame schema information as well as the train-from-dataframe routine
   * (which is a kind of factory routine) for [[FlowSuspiciousConnectsModel]] instances.
   *
   */
@@ -127,7 +126,7 @@
     IbytField,
     IpktField))
 
-  val ModelColumns = ModelSchema.fieldNames.toList.map(col)
+  val ModelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
 
 
   def trainModel(sparkSession: SparkSession,
@@ -146,13 +145,12 @@
       config.duplicationFactor))
 
 
+    import sparkSession.implicits._
     // simplify netflow log entries into "words"
 
     val dataWithWords = totalRecords.withColumn(SourceWord, FlowWordCreator.srcWordUDF(ModelColumns: _*))
       .withColumn(DestinationWord, FlowWordCreator.dstWordUDF(ModelColumns: _*))
 
-    import sparkSession.implicits._
-
     // Aggregate per-word counts at each IP
     val srcWordCounts = dataWithWords
       .filter(dataWithWords(SourceWord).notEqual(InvalidDataHandler.WordError))
@@ -173,20 +171,19 @@
         .reduceByKey(_ + _)
         .map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
 
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipWordCounts, config.precisionUtility, sparkSession)
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
-      ipWordCounts,
-      config.topicCount,
+    val model = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
 
-    new FlowSuspiciousConnectsModel(config.topicCount,
-      ipToTopicMix,
-      wordToPerTopicProb)
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
+
+    new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
   }
 }
\ No newline at end of file
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 3ef60af..7332fe4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -25,9 +25,8 @@
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.SuspiciousConnectsScoreFunction
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
 import org.apache.spot.proxy.ProxySchema._
 import org.apache.spot.utilities._
 import org.apache.spot.utilities.data.validation.InvalidDataHandler
@@ -92,7 +91,7 @@
   */
 object ProxySuspiciousConnectsModel {
 
-  // These buckets are optimized to datasets used for training. Last bucket is of infinite size to ensure fit.
+  // These buckets are optimized to data sets used for training. Last bucket is of infinite size to ensure fit.
   // The maximum value of entropy is given by log k where k is the number of distinct categories.
   // Given that the alphabet and number of characters is finite the maximum value for entropy is upper bounded.
   // Bucket number and size can be changed to provide less/more granularity
@@ -119,8 +118,8 @@
     * for clustering in the topic model.
     *
     * @param sparkSession Spark Session
-    * @param logger       Logge object.
-    * @param config       SuspiciousConnetsArgumnetParser.Config object containg CLI arguments.
+    * @param logger       Logger object.
+    * @param config       SuspiciousConnectsArgumentParser.Config object containing CLI arguments.
     * @param inputRecords Dataframe for training data, with columns Host, Time, ReqMethod, FullURI, ResponseContentType,
     *                     UserAgent, RespCode (as defined in ProxySchema object).
     * @return ProxySuspiciousConnectsModel
@@ -130,7 +129,7 @@
                  config: SuspiciousConnectsConfig,
                  inputRecords: DataFrame): ProxySuspiciousConnectsModel = {
 
-    logger.info("training new proxy suspcious connects model")
+    logger.info("training new proxy suspicious connects model")
 
 
     val selectedRecords =
@@ -145,24 +144,24 @@
         .reduceByKey(_ + _).collect()
         .toMap
 
-    val agentToCountBC = sparkSession.sparkContext.broadcast(agentToCount)
-
     val docWordCount: RDD[SpotLDAInput] =
       getIPWordCounts(sparkSession, logger, selectedRecords, config.feedbackFile, config.duplicationFactor,
         agentToCount)
 
-    val SpotLDAOutput(ipToTopicMixDF, wordResults) = SpotLDAWrapper.runLDA(sparkSession,
-      docWordCount,
-      config.topicCount,
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(docWordCount, config.precisionUtility, sparkSession)
+
+    val model = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
 
-    new ProxySuspiciousConnectsModel(config.topicCount, ipToTopicMixDF, wordResults)
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
+
+    new ProxySuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
 
   }
 
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
index 0183027..083cfe7 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
@@ -28,7 +28,6 @@
 
   val TopDomains: Set[String] = Source.fromFile(alexaTop1MPath).getLines.map(line => {
     val parts = line.split(",")
-    val l = parts.length
     parts(1).split('.')(0)
   }).toSet
 }
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
new file mode 100644
index 0000000..93828b2
--- /dev/null
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spot.lda.SpotLDAWrapperSchema.TopicProbabilityMix
+import org.apache.spot.testutils.TestingSparkContextFlatSpec
+import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
+import org.scalatest.Matchers
+
+/**
+  * Created by rabarona on 7/17/17.
+  */
+class SpotLDAHelperTest extends TestingSparkContextFlatSpec with Matchers {
+
+  "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
+    "is the docID, values are the vectors of word occurrences in that doc" in {
+
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkLDAInput: RDD[(Long, Vector)] = spotLDAHelper.formattedCorpus
+    val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
+
+    sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(5.0, 8.0))), (2, Vectors.sparse(4, Array
+    (1), Array(2.0))), (1, Vectors.sparse(4, Array(2), Array(4.0))))
+  }
+
+  "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
+    "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+    import testImplicits._
+    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
+      docProbabilities)
+    }).collect
+
+    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
+
+    documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
+    documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
+    documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
+
+    documentProbabilities(0) shouldBe a[java.lang.Double]
+
+  }
+
+  it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
+    "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility32, sparkSession)
+
+    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+    val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+    import testImplicits._
+    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
+      docProbabilities)
+    }).collect
+
+    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
+
+    documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
+    documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
+    documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
+
+    documentProbabilities(0) shouldBe a[java.lang.Float]
+  }
+
+  "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
+    "to strings, and sequence of probabilities to array" in {
+
+    val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "23.0_7.0_7.0_4.0", 8),
+      SpotLDAInput("10.10.98.123", "80.0_7.0_7.0_4.0", 4),
+      SpotLDAInput("66.23.45.11", "333333.0_7.0_7.0_4.0", 2),
+      SpotLDAInput("192.168.1.2", "-1_23.0_7.0_7.0_4.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkWordRes = spotLDAHelper.formatTopicDistributions(testMat)
+
+    sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
+  }
+}
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
index 5c40068..ae25d89 100644
--- a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
@@ -18,25 +18,18 @@
 package org.apache.spot.lda
 
 import org.apache.log4j.{Level, LogManager}
-import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spot.lda.SpotLDAWrapper._
 import org.apache.spot.lda.SpotLDAWrapperSchema._
 import org.apache.spot.testutils.TestingSparkContextFlatSpec
 import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
 import org.scalatest.Matchers
 
-import scala.collection.immutable.Map
-
 class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
 
   "SparkLDA" should "handle an extremely unbalanced two word doc with EM optimizer" in {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -46,16 +39,20 @@
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Double]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -65,9 +62,9 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.2
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
-    val ldaMaxIterations = 20
+    val ldaMaxIterations = 100
 
     val optimizer = "em"
 
@@ -75,20 +72,24 @@
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
     val dogTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
     val catTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -98,7 +99,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  0.0009
+    val ldaAlpha = 0.0009
     val ldaBeta = 0.00001
     val ldaMaxIterations = 400
 
@@ -108,16 +109,20 @@
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Double]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -127,7 +132,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  0.0009
+    val ldaAlpha = 0.0009
     val ldaBeta = 0.00001
     val ldaMaxIterations = 400
     val optimizer = "online"
@@ -136,20 +141,25 @@
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
+
     val dogTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
     val catTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -159,7 +169,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -169,16 +179,20 @@
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0).toDouble * catTopics(0) + topicMix(1).toDouble * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0).toDouble * dogTopics(0) + topicMix(1).toDouble * dogTopics(1))) should be < 0.01
@@ -188,7 +202,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -198,134 +212,28 @@
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
+
     val dogTopicMix: Array[Float] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
 
     val catTopicMix: Array[Float] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
   }
 
-  "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
-    "is the docID, values are the vectors of word occurrences in that doc" in {
-
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val wordDictionary = Map("333333_7.0_0.0_1.0" -> 0, "1111111_6.0_3.0_5.0" -> 1, "-1_43_7.0_2.0_6.0" -> 2,
-      "-1_80_6.0_1.0_1.0" -> 3)
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-
-    val sparkLDAInput: RDD[(Long, Vector)] = SpotLDAWrapper.formatSparkLDAInput(documentWordData,
-      documentDictionary, wordDictionary, sparkSession)
-    val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
-
-    sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(8.0, 5.0))), (2, Vectors.sparse(4, Array
-    (2), Array(2.0))), (1, Vectors.sparse(4, Array(1), Array(4.0))))
-  }
-
-  "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
-    "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
-      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
-        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
-        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
-    val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
-      FloatPointPrecisionUtility64)
-
-    import testImplicits._
-    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
-      docProbabilities)
-    }).collect
-
-    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
-
-    documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
-    documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
-    documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
-
-    documentProbabilities(0) shouldBe a[java.lang.Double]
-
-  }
-
-  it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
-    "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
-      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
-        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
-        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
-    val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
-      FloatPointPrecisionUtility32)
-
-    import testImplicits._
-    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
-      docProbabilities)
-    }).collect
-
-    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
-
-    documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
-    documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
-    documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
-
-    documentProbabilities(0) shouldBe a[java.lang.Float]
-  }
-
-  "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
-    "to strings, and sequence of probabilities to array" in {
-    val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
-
-    val wordDictionary = Map("-1_23.0_7.0_7.0_4.0" -> 3, "23.0_7.0_7.0_4.0" -> 0, "333333.0_7.0_7.0_4.0" -> 2, "80.0_7.0_7.0_4.0" -> 1)
-    val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
-    val sparkWordRes = formatSparkLDAWordOutput(testMat, revWordMap)
-
-    sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
-  }
 }
\ No newline at end of file
diff --git a/spot-oa/api/resources/flow.py b/spot-oa/api/resources/flow.py
index ab5105f..418f87c 100755
--- a/spot-oa/api/resources/flow.py
+++ b/spot-oa/api/resources/flow.py
@@ -492,8 +492,8 @@
     }
 
     #----- Add Inbound Connections-------#
+    obj["children"].append({'name': 'Inbound Only', 'children': [], 'impact': 0})
     if len(inbound) > 0:
-        obj["children"].append({'name': 'Inbound Only', 'children': [], 'impact': 0})
         in_ctxs = {}
         for ip in inbound:
             if 'nwloc' in inbound[ip] and len(inbound[ip]['nwloc']) > 0:
@@ -509,8 +509,8 @@
                 })
 
     #------ Add Outbound ----------------#
+    obj["children"].append({'name':'Outbound Only','children':[],'impact':0})
     if len(outbound) > 0:
-        obj["children"].append({'name':'Outbound Only','children':[],'impact':0})
         out_ctxs = {}
         for ip in outbound:
             if 'nwloc' in outbound[ip] and len(outbound[ip]['nwloc']) > 0:
@@ -526,8 +526,8 @@
                 })
 
     #------ Add TwoWay ----------------#
+    obj["children"].append({'name':'two way','children': [], 'impact': 0})
     if len(twoway) > 0:
-        obj["children"].append({'name':'two way','children': [], 'impact': 0})
         tw_ctxs = {}
         for ip in twoway:
             if 'nwloc' in twoway[ip] and len(twoway[ip]['nwloc']) > 0:
diff --git a/spot-oa/oa/dns/dns_oa.py b/spot-oa/oa/dns/dns_oa.py
index 5982e8b..5023d7f 100644
--- a/spot-oa/oa/dns/dns_oa.py
+++ b/spot-oa/oa/dns/dns_oa.py
@@ -232,7 +232,7 @@
                     rep_results = {k: "{0}::{1}".format(rep_results.get(k, ""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)}
 
                 if rep_results:
-                    self._dns_scores = [ conn + [ rep_results[conn[key]] ]   for conn in self._dns_scores  ]
+                    self._dns_scores = [ conn + [ rep_results.get(key) ]    for conn in self._dns_scores  ]
                 else:
                     self._dns_scores = [ conn + [""]   for conn in self._dns_scores  ]
         else:
@@ -418,4 +418,4 @@
             query_to_insert=("""
                 INSERT INTO {0}.dns_ingest_summary PARTITION (y={1}, m={2}, d={3}) VALUES {4};
             """).format(self._db, yr, mn, dy, tuple(df_final))
-            impala.execute_query(query_to_insert)
\ No newline at end of file
+            impala.execute_query(query_to_insert)
diff --git a/spot-oa/requirements.txt b/spot-oa/requirements.txt
index 9f3afb8..1faa1b6 100644
--- a/spot-oa/requirements.txt
+++ b/spot-oa/requirements.txt
@@ -16,7 +16,7 @@
 # GraphQL API dependencies
 flask
 flask-graphql
-graphql-core
+graphql-core == 1.1.0
 urllib3
 
 # API Resources
diff --git a/spot-oa/runIpython.sh b/spot-oa/runIpython.sh
index 38a4121..26eaeff 100755
--- a/spot-oa/runIpython.sh
+++ b/spot-oa/runIpython.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
diff --git a/spot-setup/hdfs_setup.sh b/spot-setup/hdfs_setup.sh
index 86a26c0..df898c8 100755
--- a/spot-setup/hdfs_setup.sh
+++ b/spot-setup/hdfs_setup.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
