'master' into pr/131, SPOT-244 to close apache/incubator-spot#131
diff --git a/dev/release/README.md b/dev/release/README.md
new file mode 100644
index 0000000..fb9a18c
--- /dev/null
+++ b/dev/release/README.md
@@ -0,0 +1,474 @@
+# **Apache Spot (Incubating) Release Process**
+
+## **Overview**  
+
+This document describes the Release Process to perform the official Apache Spot (Incubating) release following the [Apache Software Foundation Release Policy](http://incubator.apache.org/guides/releasemanagement.html#best-practice). 
+ 
+### **Requirements**
+
+As a Release Manager (RM), you should create a code signing key to sign the release artifacts following this [guide](http://www.apache.org/dev/openpgp.html#generate-key).
+
+Public key should be detached and added in to the KEYS file in Spot Repo under: https://dist.apache.org/repos/dist/dev/incubator/spot/KEYS
+
+### **Policy documents**
+
+Frequently asked questions for making Apache releases are available on [Releases FAQ page](http://www.apache.org/legal/release-policy.html#releases).
+
+The Release Manager must go through the policy document to understand all the tasks and responsibilities of running a release.
+
+### **Give a heads up**
+
+The release manager should create an EPIC in Jira and then setup a timeline for release branch point. The time for the day the EPIC is created to the release branch point must be at least two weeks in order to give the community a chance to prioritize and commit any last minute features and issues they would like to see in the upcoming release.
+
+The release manager should then send the pointer to the EPIC along with the tentative timeline (Code Freeze) for branch point to the user and developer lists. Any work identified as release related that needs to be completed should be added as a subtask of the umbrella issue to allow users to see the overall release progress in one place.
+
+    To: dev@spot.apache.org
+
+    Subject: Work on Spot <your release name>  (Incubating) Release has started + Code Freeze Date
+
+
+    We are starting the process to prepare for Spot <your release name> (Incubating) release. I have opened JIRA $jira to cover the features included in this release.
+
+    If you have any JIRA in progress and would like to include in this release, please follow the process to do so. Code Freeze for final integration will be on $code_freeze_date.
+
+    Feel free to comment on the JIRA if you have any comments/suggestions.
+
+    Thanks,
+    <Release Manager Name>
+
+### **Sanitize Jira**
+
+Before a release is done, make sure that any issues that are fixed have their fix version setup correctly. If the release number is not listed in the "fix version" field, as RM create a ticket for Infrastructure asking to create the value for the "fix version" field. 
+
+Once the value is created by Infrastructure team, run the following JIRA query to see which resolved issues do not have their fix version set up correctly:
+
+project = spot and resolution = fixed and fixVersion is empty
+
+The result of the above query should be empty. If some issues do show up in this query that have been fixed since the last release, please bulk-edit them to set the fix version to '1.0'.
+
+You can also run the following query to make sure that the issues fixed for the to-be-released version look accurate:
+
+project = spot and resolution = fixed and fixVersion = '1.0'
+
+### **Monitor active issues**
+
+It is important that between the time that the umbrella issue is filed to the time when the release branch is created, no experimental or potentially destabilizing work is checked into the trunk. While it is acceptable to introduce major changes, they must be thoroughly reviewed and have good test coverage to ensure that the release branch does not start of being unstable.
+
+If necessary the RM can discuss if certain issues should be fixed on the trunk in this time, and if so what is the gating criteria for accepting them.
+
+### **Pull Request Validation**
+
+All the features that will be included in the release EPIC needs to have a proper Pull Request (PR) created following this guide. And needs three +1 votes which at least one of them must be from the QA Team. 
+
+Development must be done in to a Topic Branch or Master Branch, depending on the scope of the release.
+
+Once the PR has votes, then the PPMC developer must merge the PR and the owner of that PR should close it properly in Github (in case it does not close automatically).
+
+If Development is performed in Topic Branch, then Topic Branch should be merged in to master branch once development is done. 
+
+### **Create the Release Candidate**
+
+The Release candidate branch will be created from Master branch once all pull requests form the EPIC are merged to a topic branch or directly to master branch before code freeze date.
+
+#### **Branch your release:**
+
+* git checkout -b `<your release name>` `<commit sha1>` 
+
+push to origin:
+
+* git push origin `<your release name>`
+
+#### **Tag your Branch release:**
+Apply signed tag on release branch that will indicate where the release candidate was generated.
+
+Example:
+* git tag -u `<GPG KEY ID>` --sign `<your release name>`-incubating -m "Apache Spot `<your release name>` (Incubating)" `<SHA of HEAD of branch>`
+
+### **Run RAT**
+Apache Rat is a release audit tool, focused on licenses. Used to improve accuracy and efficiency when checking releases for licenses.
+
+Download RAT: 
+* wget http://apache.claz.org//creadur/apache-rat-0.12/apache-rat-0.12-bin.tar.gz
+
+Decompress the code:
+* tar -zxvf apache-rat-0.12-bin.tar.gz
+* cd apache-rat-0.12
+
+Now lets create the file to exclude the known extensions and files:
+* vi or nano .rat-excludes
+
+Add the following exclude list:
+
+    .*md
+    .*txt
+    .gitignore
+    .gitmodules
+    .*png
+    .*json
+    .*csvss
+    .*less
+    .*ipynb
+    .babelrc
+    topojson.min.js
+
+Save the File.
+
+Or download the .rat-excludes files from: https://github.com/apache/incubator-spot/blob/master/dev/release/.rat-excludes
+
+Run the rat tool as following.
+* java -jar apache-rat-0.12.jar -E /path/to/project/.rat-excludes -d /path/to/project/ > `<to output file>`.txt
+
+If you have rat in the same directory as the Spot Code you can verify as:
+* java -jar apache-rat-0.12.jar -E .rat-excludes -d ../apache-spot-1.0-incubating > apache-spot-1.0-incubating-rat-results.txt
+
+If RAT find problems in the licenses please fix the licence and run RAT again developers must fix their code and submit changes into the release branch, once there are no more findings. Upload RAT Results into subversion dev incubator repo for Spot of the release
+
+#### **Make a tarball and gzip:**
+* git archive -o ../apache-spot-`<your release name>`-incubating.tar --prefix=apache-spot-`<your release name>`-incubating/ `<your tag/branch name>`
+* gzip ../apache-spot-`<your release name>`-incubating.tar
+
+Example:
+
+    $ git archive -o ../apache-spot-1.0-incubating.tar --prefix=apache-spot-1.0-incubating/ 1.0-incubating
+    $ gzip ../apache-spot-1.0-incubating.tar
+
+#### **Prepare MD5, SHA512 and ASC files from the source tarball:**
+
+* md5 apache-spot-`<your release name>`-incubating.tar.gz > apache-spot-`<your release name>`-incubating.tar.gz.md5
+* shasum -a 512 apache-spot-`<your release name>`-incubating.tar.gz > apache-spot-`<your release name>`-incubating.tar.gz.sha512 
+* gpg2 --detach-sign -a apache-spot-`<your release name>`-incubating.tar.gz
+
+Example:
+
+    $ md5 apache-spot-1.0-incubating..tar.gz > apache-spot-1.0-incubating.tar.gz.md5
+    $ shasum -a 512 apache-spot-1.0-incubating.tar.gz > apache-spot-1.0-incubating.tar.gz.sha512
+    $ gpg2 --detach-sign -a apache-spot-1.0-incubating..tar.gz
+ 
+
+#### **Retrieve the subversion dev incubator repo for Spot**
+
+Example:
+* svn checkout https://dist.apache.org/repos/dist/dev/incubator/spot/ --username=`<your apache user>`
+ 
+Create a local folder for the release (e.g. 1.0-incubating) in svn. 
+* svn mkdir -m "Creating Spot `<release number>` dir" https://dist.apache.org/repos/dist/dev/incubator/spot/`<release number>` --username=`<your apache user>`
+
+Example:
+
+    svn mkdir -m "Creating Spot 1.0-incubating dir" https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incuabting --username=`<your apache user>`
+
+
+Move the files into the release folder on local disk.
+* svn add `<release folder>`
+
+Example:
+
+    svn add 1.0-incubating/
+
+Commit artifacts:
+* svn commit -m '`<custom message>`' --username=`<your apache user id>`
+
+Example:
+
+    svn commit -m 'adding spot 1.0-incubating candidate release artifacts' --username=`<your apache user id>`
+
+## **Validate the Build**
+
+Download the tarball.
+
+* http://spot.apache.org/download
+
+Decompress the tarball. Instruction:
+    
+    tar -zxvf apache-spot-1.0-incubating.tar.gz
+
+Change directory. Instruction:
+    
+    cd apache-spot-1.0-incubator
+
+Apache Spot (incubating) is composed of more than one module or sub-projects. Since some of them are Python or Javascript code, they don’t need compilation.
+
+For more instructions about how to install each module please read below instructions.
+
+ You should see the content of the folder:
+
+    spotadmin-mac01:apache-spot-1.0-incubating spotadmin$ ls -la
+    total 72
+    drwxr-xr-x 14 spotadmin staff 476 Jul 24 16:45 .
+    drwxr-xr-x 7 spotadmin staff 238 Aug 4 09:32 ..
+    -rw-r--r-- 1 spotadmin staff 20 Jul 24 16:45 .gitignore
+    -rw-r--r-- 1 spotadmin staff 0 Jul 24 16:45 .gitmodules
+    -rw-r--r-- 1 spotadmin staff 560 Jul 24 16:45 DISCLAIMER
+    -rw-r--r-- 1 spotadmin staff 11918 Jul 24 16:45 LICENSE
+    -rw-r--r-- 1 spotadmin staff 1493 Jul 24 16:45 LICENSE-topojson.txt
+    -rw-r--r-- 1 spotadmin staff 159 Jul 24 16:45 NOTICE
+    -rw-r--r-- 1 spotadmin staff 6761 Jul 24 16:45 README.md
+    drwxr-xr-x 3 spotadmin staff 102 Jul 24 16:45 docs
+    drwxr-xr-x 10 spotadmin staff 340 Jul 24 16:45 spot-ingest
+    drwxr-xr-x 13 spotadmin staff 442 Jul 24 16:45 spot-ml
+    drwxr-xr-x 11 spotadmin staff 374 Jul 24 16:45 spot-oa
+    drwxr-xr-x 10 spotadmin staff 340 Jul 24 16:45 spot-setup
+
+Decompressed tarball content should be the same with the content located in: 
+
+* [https://github.com/apache/incubator-spot/tree/v1.0-incubating](https://github.com/apache/incubator-spot/tree/v1.0-incubating)
+
+To install the properly component please follow this guide:
+
+* [http://spot.apache.org/doc/#installation](http://spot.apache.org/doc/#installation)
+
+Spot Ingest, Spot Setup, Spot OA and Spot UI have specific requirements to install manually.
+    
+* [http://spot.apache.org/doc/#configuration](http://spot.apache.org/doc/#configuration)
+* [http://spot.apache.org/doc/#ingest](http://spot.apache.org/doc/#ingest)
+* [http://spot.apache.org/doc/#oa](http://spot.apache.org/doc/#oa)
+* [http://spot.apache.org/doc/#ui](http://spot.apache.org/doc/#ui)
+
+Spot ML is the only component to build the binary files using sbt assembly commands. Please follows these instructions.
+    
+* [http://spot.apache.org/doc/#ml](http://spot.apache.org/doc/#ml)
+
+
+## **Running the Vote**
+
+As per the Apache Incubator release [guidelines](http://incubator.apache.org/policy/incubation.html#Releases), all releases for incubating projects must go through a two-step voting process. First, release voting must successfully pass within the Apache Spot (Incubating) community via the dev@spot.incubator.apache.org mail list. Then, release voting must successfully pass within the Apache Incubator PMC via the general@incubator.apache.org mail list.
+
+### **Call for Spot Community Vote**
+
+Call for Vote in spot dev community sending an email to dev list.
+
+For example,
+
+    To: dev@spot.apache.org
+    Subject: [VOTE] Release Apache Spot 1.0-incubating
+
+    Hi All, 
+
+    This is the vote for Apache Spot 1.0 (incubating) release.
+
+    The vote will run for at least 72 hours and will close on July 27,2017.
+
+    Release Notes (Jira generated):
+    https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12320637&version=12340668
+
+    Git Branch and Tag for the release:
+    https://github.com/apache/incubator-spot/tree/branch-1.0
+    https://github.com/apache/incubator-spot/tree/v1.0-incubating
+
+    Source code for the release:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz
+    
+    Source release verification:
+    PGP Signature:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz.asc
+
+    MD5/SHA512 Hash:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz.md5
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz.sha512
+
+    RAT license Verification:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating-rat-results.txt 
+
+    Keys to verify the signature of the release artifact are available at:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/KEYS
+
+    The artifact(s) have been signed with Key : 06B82CAEDB5B280349E75D5533CD9431141E946C
+
+    Download the release candidate and evaluate the necessary items including checking hashes, signatures, source code and test.
+    Please vote accordingly:
+    [ ] +1 approve
+    [ ] +0 no opinion
+    [ ] -1 disapprove (and reason why)
+
+    =================
+    DISCLAIMER
+
+    Apache Spot (incubating) is an effort undergoing incubation at the Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC.
+    Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects.
+
+    While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.
+    =================
+
+    --
+    Best Regards!
+    -----------------------------------
+    <Release Manager Name>
+    http://spot.apache.org/
+    -----------------------------------
+
+Allow the community to vote and any -1 vote with comments please fix ASAP and send findings to the thread until the required votes are reached by the PPMC members of Spot.
+
+Send a following email with the results that should include the counting votes and thread of the results listed in http://lists.apache.org
+
+    To: dev@spot.apache.org
+    Subject: [RESULT][VOTE] Release Apache Spot 1.0-incubating
+
+    Hi All, 
+
+    The voting process for the Release Apache Spot 1.0-incubating is now closed with the following and positive results:
+
+    [10] Binding Votes
+    [1] Non-binding
+
+    Thread of the voting email with responses can be found here:
+
+    https://lists.apache.org/thread.html/69dfe2626c7b803e2a3f26e4d348be8d1941003f0e8166fb8e0e9679@%3Cdev.spot.apache.org%3E
+
+    The next step will be sending the release artifacts for voting at the Incubator General list to get the IPMC approval to officially declare the release. 
+
+
+    Thanks
+    --
+    Best Regards!
+    -----------------------------------
+    <Release Manager Name>
+    http://spot.apache.org/
+    -----------------------------------
+
+### **Call for Incubator PMC Vote**
+
+The second voting is the most important since it is required to get three +1 (Binding) vote from the IPMC members from the Incubator General list to declare an official release.
+
+Send the vote to the general Incubator list and include the voting results from the dev list as evidence. 
+
+    To: general@incubator.apache.org
+    Subject: [VOTE] Release Apache Spot 1.0-incubating
+
+    Dear IPMC team,
+
+    This is the vote for Apache Spot 1.0 (incubating) release. This is the first release of Spot.
+
+    Apache Spot (Incubating) is open source software for leveraging insights from flow and packet analysis. It helps enterprises and service providers gain insight on their network environments through transparency of service delivery and identification of potential security threats or attacks happening among resources operating at cloud scale. While current threat intelligence tools help, identifying unknown threats and attacks remains a challenge. Apache Spot provides tools to accelerate companies’ ability to expose suspicious connections and previously unseen attacks using flow and packet analysis technologies.
+
+    The PPMC Vote Threads can be found here:
+    https://lists.apache.org/thread.html/69dfe2626c7b803e2a3f26e4d348be8d1941003f0e8166fb8e0e9679@%3Cdev.spot.apache.org%3E
+
+
+    The PPMC vote results can be found here:
+    https://lists.apache.org/thread.html/a88ef44e0dcda9013781eeca363ad9b3439f6c34a698c6eaa50fb314@%3Cdev.spot.apache.org%3E 
+
+
+    Release Notes (Jira generated):
+    https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12320637&version=12340668
+
+    Git Branch and Tag for the release:
+    https://github.com/apache/incubator-spot/tree/branch-1.0
+    https://github.com/apache/incubator-spot/tree/v1.0-incubating
+
+    Source code for the release:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz
+    
+    Source release verification:
+    http://nolamarketing.com/client/apache-spot/download/
+
+    PGP Signature:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz.asc
+
+    MD5/SHA512 Hash:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz.md5
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating.tar.gz.sha512
+    
+    RAT license Verification:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incubating/apache-spot-1.0-incubating-rat-results.txt 
+
+    Keys to verify the signature of the release artifact are available at:
+    https://dist.apache.org/repos/dist/dev/incubator/spot/KEYS
+
+    The artifact(s) have been signed with Key : 06B82CAEDB5B280349E75D5533CD9431141E946C
+
+    Download the release candidate and evaluate the necessary items.
+
+    Please vote accordingly:
+    [ ] +1, approve as the official Apache Spot 1.0-incubating release
+    [ ] -1, do not accept as the official as the official Apache Spot 1.0-incubating release because...
+
+    The vote will run for at least 72 hours or until necessary number of votes are reached.
+
+    =================
+    DISCLAIMER
+
+    Apache Spot (incubating) is an effort undergoing incubation at the Apache Software Foundation (ASF), sponsored by the Apache Incubator PMC.
+    Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects.
+
+    While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that project has yet to be fully endorsed by the ASF.
+    =================
+
+    --
+    Best Regards!
+    -----------------------------------
+    <Release Manager Name>
+    http://spot.apache.org/
+    -----------------------------------
+
+Monitor the voting thread and make sure you have the required votes, if feedback is provides fix ASAP and send the updates to the voting thread so voting reaches the required votes.
+
+Once we have the three +1 (Binding) votes then send the following email with the results:
+
+
+    To: general@incubator.apache.org
+    Subject: [RESULT][VOTE] Release Apache Spot 1.0-incubating
+
+    Hi All, 
+
+    The voting process for the Release Apache Spot 1.0-incubating is now closed with the following and positive results:
+
+    [3] Binding Votes
+    [1] Non-binding
+
+    Thread of the voting email with responses can be found here:
+
+    https://lists.apache.org/thread.html/32d7c93fe66cc256ed12a5b8f91b57b1d0d659b9012c8f4f13c11191@%3Cgeneral.incubator.apache.org%3E
+
+    
+    I will prepare the artifacts to officially release Apache Spot 1.0-incubating. 
+
+
+    Thanks
+    --
+    Best Regards!
+    -----------------------------------
+    <Release Manager Name>
+    http://spot.apache.org/
+    -----------------------------------
+
+
+Moving the Artifacts to release stage in SVN using the following command.
+* svn move -m "`<comment>`" `<Directory Origin>` `<Directory Destination>` --username=`<your apache user id>`
+
+Example:
+    
+    svn move -m "Moving Apache Spot 1.0-incubating release artifacts to release stage" /* https://dist.apache.org/repos/dist/dev/incubator/spot/1.0-incuabting https://dist.apache.org/repos/dist/release/incubator/spot/1.0-incuabting --username=`<your apache user id>`
+
+Allow 24 hours before updating the webpage and announcing the new Release in Apache Spot (Incubating) webpage. http://nolamarketing.com/client/apache-spot/download/
+
+### **Update WebPages**
+
+You need to update the Spot webpages to reflect the new release.
+
+### **Announce the release**
+
+Email to the different distribution lists announce@apache.org, user@spot.apache.org, dev@spot.apache.org (using your @apache.org email) For example:
+
+    To: announce@apache.org, user@spot.apache.org, dev@spot.apache.org
+    Subject: [ANNOUNCE] Apache Spot 1.0 (incubating) released
+
+    The Apache Spot (Incubating) team is pleased to announce the release of Spot 1.0-incubating.
+
+    This is the first release of Spot. Major step forward of the project.
+
+    Apache Spot (Incubating) is open source software for leveraging insights from flow and packet analysis. It helps enterprises and service providers gain insight on their network environments through transparency of service delivery and identification of potential security threats or attacks happening among resources operating at cloud scale. While current threat intelligence tools help, identifying unknown threats and attacks remains a challenge.
+
+    The release is available here:
+    http://nolamarketing.com/client/apache-spot/download/ (Update from final version)
+
+    The full change log is available here:
+    https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12320637&version=12340668
+
+    Your help and feedback is more than welcome. For more information on how to report problems and to get involved, visit the project website at http://spot.apache.org/.
+
+    The Apache Spot (Incubating) Team
+
+
+### **Close the Jira Ticket**
+
+Once the release is announced you can go to Jira and Close the EPIC created to perform the release.
\ No newline at end of file
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index dfcb543..7245acf 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -19,22 +19,18 @@
 
 import org.apache.log4j.Logger
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.DNSWordCreation
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper._
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda._
 import org.apache.spot.utilities.DomainProcessor.DomainInfo
 import org.apache.spot.utilities._
 import org.apache.spot.utilities.data.validation.InvalidDataHandler
 
-import scala.util.{Failure, Success, Try}
-
 
 /**
   * A probabilistic model of the DNS queries issued by each client IP.
@@ -50,17 +46,17 @@
   *
   * Create these models using the  factory in the companion object.
   *
-  * @param inTopicCount          Number of topics to use in the topic model.
-  * @param inIpToTopicMix        Per-IP topic mix.
-  * @param inWordToPerTopicProb  Per-word,  an array of probability of word given topic per topic.
+  * @param inTopicCount         Number of topics to use in the topic model.
+  * @param inIpToTopicMix       Per-IP topic mix.
+  * @param inWordToPerTopicProb Per-word,  an array of probability of word given topic per topic.
   */
 class DNSSuspiciousConnectsModel(inTopicCount: Int,
                                  inIpToTopicMix: DataFrame,
                                  inWordToPerTopicProb: Map[String, Array[Double]]) {
 
-  val topicCount = inTopicCount
-  val ipToTopicMix = inIpToTopicMix
-  val wordToPerTopicProb = inWordToPerTopicProb
+  val topicCount: Int = inTopicCount
+  val ipToTopicMix: DataFrame = inIpToTopicMix
+  val wordToPerTopicProb: Map[String, Array[Double]] = inWordToPerTopicProb
 
   /**
     * Use a suspicious connects model to assign estimated probabilities to a dataframe of
@@ -128,7 +124,7 @@
     QueryTypeField,
     QueryResponseCodeField))
 
-  val modelColumns = ModelSchema.fieldNames.toList.map(col)
+  val modelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
 
   val DomainStatsSchema = StructType(List(TopDomainField, SubdomainLengthField, SubdomainEntropyField, NumPeriodsField))
 
@@ -136,7 +132,7 @@
     * Create a new DNS Suspicious Connects model by training it on a data frame and a feedback file.
     *
     * @param sparkSession Spark Session
-    * @param logger
+    * @param logger       Application logger
     * @param config       Analysis configuration object containing CLI parameters.
     *                     Contains the path to the feedback file in config.scoresFile
     * @param inputRecords Data used to train the model.
@@ -155,7 +151,6 @@
       config.feedbackFile,
       config.duplicationFactor))
 
-    val countryCodesBC = sparkSession.sparkContext.broadcast(CountryCodes.CountryCodes)
     val topDomainsBC = sparkSession.sparkContext.broadcast(TopDomains.TopDomains)
     val userDomain = config.userDomain
 
@@ -175,19 +170,20 @@
         .reduceByKey(_ + _)
         .map({ case ((ipDst, word), count) => SpotLDAInput(ipDst, word, count) })
 
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipDstWordCounts, config.precisionUtility, sparkSession)
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
-      ipDstWordCounts,
-      config.topicCount,
+    val model: SpotLDAModel = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
 
-    new DNSSuspiciousConnectsModel(config.topicCount, ipToTopicMix, wordToPerTopicProb)
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
+
+    new DNSSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
 
   }
 
@@ -205,15 +201,16 @@
                        userDomain: String,
                        url: String): TempFields = {
 
-    val DomainInfo(_, topDomainClass, subdomain, subdomainLength, subdomainEntropy, numPeriods) =
+    val DomainInfo(_, topDomainClass, _, subDomainLength, subDomainEntropy, numPeriods) =
       DomainProcessor.extractDomainInfo(url, topDomainsBC, userDomain)
 
 
     TempFields(topDomainClass = topDomainClass,
-      subdomainLength = subdomainLength,
-      subdomainEntropy = subdomainEntropy,
+      subdomainLength = subDomainLength,
+      subdomainEntropy = subDomainEntropy,
       numPeriods = numPeriods)
   }
 
   case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
+
 }
\ No newline at end of file
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
new file mode 100644
index 0000000..e9f0b66
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.utilities.{FloatPointPrecisionUtility, FloatPointPrecisionUtility64}
+
+import scala.collection.immutable.Map
+
+/**
+  * Apache Spot routines to format Spark LDA input and output for scoring.
+  */
+class SpotLDAHelper(private final val sparkSession: SparkSession,
+                    final val docWordCount: RDD[SpotLDAInput],
+                    private final val documentDictionary: DataFrame,
+                    private final val wordDictionary: Map[String, Int],
+                    private final val precisionUtility: FloatPointPrecisionUtility = FloatPointPrecisionUtility64)
+  extends Serializable {
+
+  /**
+    * Format document word count as RDD[(Long, Vector)] - input data for LDA algorithm
+    *
+    * @return RDD[(Long, Vector)]
+    */
+  val formattedCorpus: RDD[(Long, Vector)] = {
+    import sparkSession.implicits._
+
+    val getWordId = {
+      udf((word: String) => wordDictionary(word))
+    }
+
+    val docWordCountDF = docWordCount
+      .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
+      .toDF(DocumentName, WordName, WordNameWordCount)
+
+    // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
+    // is indexed by DocID
+    val wordCountsPerDocDF = docWordCountDF
+      .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
+      .drop(documentDictionary(DocumentName))
+      .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
+      .drop(WordName)
+
+    val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
+    = wordCountsPerDocDF
+      .select(DocumentNumber, WordNumber, WordNameWordCount)
+      .rdd
+      .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
+      .groupByKey
+
+    // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
+    val numUniqueWords = wordDictionary.size
+    val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
+      .mapValues(vs => Vectors.sparse(numUniqueWords, vs.toSeq))
+
+    ldaInput
+  }
+
+  /**
+    * Format LDA output topicDistribution for spot-ml scoring
+    *
+    * @param documentDistributions LDA model topicDistributions
+    * @return DataFrame
+    */
+  def formatDocumentDistribution(documentDistributions: RDD[(Long, Vector)]): DataFrame = {
+    import sparkSession.implicits._
+
+    val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
+    val documentToTopicDistributionDF = documentDistributions.toDF(DocumentNumber, TopicProbabilityMix)
+
+    val documentToTopicDistributionArray = documentToTopicDistributionDF
+      .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
+      .drop(documentDictionary(DocumentNumber))
+      .drop(documentToTopicDistributionDF(DocumentNumber))
+      .select(DocumentName, TopicProbabilityMix)
+      .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
+      .selectExpr(s"$DocumentName  AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
+
+    precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
+  }
+
+  /**
+    * Format LDA output topicMatrix for spot-ml scoring
+    *
+    * @param topicsMatrix LDA model topicMatrix
+    * @return Map[String, Array[Double]]
+    **/
+  def formatTopicDistributions(topicsMatrix: Matrix): Map[String, Array[Double]] = {
+    // Incoming word top matrix is in column-major order and the columns are unnormalized
+    val m = topicsMatrix.numRows
+    val n = topicsMatrix.numCols
+    val reverseWordDictionary = wordDictionary.map(_.swap)
+
+    val columnSums: Array[Double] = Range(0, n).map(j => Range(0, m).map(i => topicsMatrix(i, j)).sum).toArray
+
+    val wordProbabilities: Seq[Array[Double]] = topicsMatrix.transpose.toArray.grouped(n).toSeq
+      .map(unNormalizedProbabilities => unNormalizedProbabilities.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
+
+    wordProbabilities.zipWithIndex
+      .map({ case (topicProbabilities, wordInd) => (reverseWordDictionary(wordInd), topicProbabilities) }).toMap
+  }
+
+}
+
+object SpotLDAHelper {
+
+  /**
+    * Factory method for SpotLDAHelper new instance.
+    *
+    * @param docWordCount Document word count.
+    * @param precisionUtility
+    * @param sparkSession
+    * @return
+    */
+  def apply(docWordCount: RDD[SpotLDAInput],
+            precisionUtility: FloatPointPrecisionUtility,
+            sparkSession: SparkSession): SpotLDAHelper = {
+
+    import sparkSession.implicits._
+
+    val docWordCountCache = docWordCount.cache()
+
+    // Forcing an action to cache results.
+    docWordCountCache.count()
+
+    // Create word Map Word,Index for further usage
+    val wordDictionary: Map[String, Int] = {
+      val words = docWordCountCache
+        .map({ case SpotLDAInput(_, word, _) => word })
+        .distinct
+        .collect
+      words.zipWithIndex.toMap
+    }
+
+    val documentDictionary: DataFrame = docWordCountCache
+      .map({ case SpotLDAInput(doc, _, _) => doc })
+      .distinct
+      .zipWithIndex
+      .toDF(DocumentName, DocumentNumber)
+      .cache
+
+    new SpotLDAHelper(sparkSession, docWordCount, documentDictionary, wordDictionary, precisionUtility)
+  }
+
+}
+
+/**
+  * Spot LDA input case class
+  *
+  * @param doc   Document name.
+  * @param word  Word.
+  * @param count Times the word appears for the document.
+  */
+case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
new file mode 100644
index 0000000..181dc62
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDAModel, LocalLDAModel}
+import org.apache.spark.sql.SparkSession
+
+/**
+  * Spot LDAModel.
+  */
+sealed trait SpotLDAModel {
+
+  /**
+    * Save the model to HDFS
+    *
+    * @param sparkSession
+    * @param location
+    */
+  def save(sparkSession: SparkSession, location: String): Unit
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring
+    *
+    * @param helper
+    * @return
+    */
+  def predict(helper: SpotLDAHelper): SpotLDAResult
+}
+
+/**
+  * Spark LocalLDAModel wrapper.
+  *
+  * @param ldaModel Spark LDA Model
+  */
+class SpotLocalLDAModel(final val ldaModel: LDAModel) extends SpotLDAModel {
+
+  /**
+    * Save LocalLDAModel on HDFS location
+    *
+    * @param sparkSession the Spark session
+    * @param location     the HDFS location
+    */
+  override def save(sparkSession: SparkSession, location: String): Unit = {
+    val sparkContext = sparkSession.sparkContext
+
+    ldaModel.save(sparkContext, location)
+  }
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+    * SpotLocalLDAModel.predict will use corpus from spotLDAHelper which can be a new set of documents or the same
+    * documents used for training.
+    *
+    * @param spotLDAHelper Spot LDA Helper object, can be the same used for training or a new instance with new
+    *                      documents.
+    * @return SpotLDAResult
+    */
+  override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+    val localLDAModel: LocalLDAModel = ldaModel.asInstanceOf[LocalLDAModel]
+
+    val topicDistributions = localLDAModel.topicDistributions(spotLDAHelper.formattedCorpus)
+    val topicMix = localLDAModel.topicsMatrix
+
+    SpotLDAResult(spotLDAHelper, topicDistributions, topicMix)
+  }
+}
+
+/** Spark DistributedLDAModel wrapper.
+  * Ideally, this model should be used only for batch processing.
+  *
+  * @param ldaModel Spark LDA Model
+  */
+class SpotDistributedLDAModel(final val ldaModel: LDAModel) extends
+  SpotLDAModel {
+
+  /**
+    * Save DistributedLDAModel on HDFS location
+    *
+    * @param sparkSession the Spark session
+    * @param location     the HDFS location
+    */
+  override def save(sparkSession: SparkSession, location: String): Unit = {
+    val sparkContext = sparkSession.sparkContext
+
+    ldaModel.save(sparkContext, location)
+  }
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+    * SpotDistributeLDAModel.predict will use same documents that were used for training, can't predict on new
+    * documents. When passing spotLDAHelper we recommend to make sure it's the same object it was passed for training.
+    *
+    * @param spotLDAHelper Spot LDA Helper object used for training
+    * @return SpotLDAResult
+    */
+  override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+    val distributedLDAModel: DistributedLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
+
+    val topicDistributions = distributedLDAModel.topicDistributions
+    val topicsMatrix = distributedLDAModel.topicsMatrix
+
+    SpotLDAResult(spotLDAHelper, topicDistributions, topicsMatrix)
+  }
+}
+
+object SpotLDAModel {
+
+  /**
+    * Factory method, based on instance of ldaModel will generate an object based on DistributedLDAModel
+    * implementation or LocalLDAModel.
+    *
+    * @param ldaModel Spark LDAModel
+    * @return
+    */
+  def apply(ldaModel: LDAModel): SpotLDAModel = {
+
+    ldaModel match {
+      case model: DistributedLDAModel => new SpotDistributedLDAModel(model)
+      case model: LocalLDAModel => new SpotLocalLDAModel(model)
+    }
+  }
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
new file mode 100644
index 0000000..a91cee2
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+/**
+  * LDA results formatted for Apache Spot scoring.
+  *
+  */
+class SpotLDAResult(private final val helper: SpotLDAHelper,
+                    final val topicDistributions: RDD[(Long, Vector)],
+                    final val documentToTopicMix: DataFrame,
+                    final val topicsMix: Matrix,
+                    final val wordToTopicMix: Map[String, Array[Double]])
+
+object SpotLDAResult {
+
+  def apply(helper: SpotLDAHelper, topicDistributions: RDD[(Long, Vector)], topicsMix: Matrix): SpotLDAResult = {
+
+    val documentToTopicMix: DataFrame = helper.formatDocumentDistribution(topicDistributions)
+    val wordToTopicMix: Map[String, Array[Double]] = helper.formatTopicDistributions(topicsMix)
+
+    new SpotLDAResult(helper, topicDistributions, documentToTopicMix, topicsMix, wordToTopicMix)
+  }
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
index 122e8ed..7a8b67e 100644
--- a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
@@ -18,19 +18,15 @@
 package org.apache.spot.lda
 
 import org.apache.log4j.Logger
-import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.clustering.{LDAModel, _}
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spot.lda.SpotLDAWrapperSchema._
-import org.apache.spot.utilities.FloatPointPrecisionUtility
-
-import scala.collection.immutable.Map
+import org.apache.spark.sql.SparkSession
 
 /**
   * Spark LDA implementation
-  * Contains routines for LDA using Scala Spark implementation from mllib
+  * Contains routines for LDA using Scala Spark implementation from org.apache.spark.mllib.clustering
   * 1. Creates list of unique documents, words and model based on those two
   * 2. Processes the model using Spark LDA
   * 3. Reads Spark LDA results: Topic distributions per document (docTopicDist) and word distributions per topic (wordTopicMat)
@@ -42,8 +38,6 @@
   /**
     * Runs Spark LDA and returns a new model.
     *
-    * @param sparkSession       the SparkSession
-    * @param docWordCount       RDD with document list and the word count for each document (corpus)
     * @param topicCount         number of topics to find
     * @param logger             application logger
     * @param ldaSeed            LDA seed
@@ -51,51 +45,20 @@
     * @param ldaBeta            topic concentration
     * @param ldaOptimizerOption LDA optimizer, em or online
     * @param maxIterations      maximum number of iterations for the optimizer
-    * @param precisionUtility   FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
     * @return
     */
-  def runLDA(sparkSession: SparkSession,
-             docWordCount: RDD[SpotLDAInput],
-             topicCount: Int,
-             logger: Logger,
-             ldaSeed: Option[Long],
-             ldaAlpha: Double,
-             ldaBeta: Double,
-             ldaOptimizerOption: String,
-             maxIterations: Int,
-             precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
+  def run(topicCount: Int,
+          logger: Logger,
+          ldaSeed: Option[Long],
+          ldaAlpha: Double,
+          ldaBeta: Double,
+          ldaOptimizerOption: String,
+          maxIterations: Int,
+          helper: SpotLDAHelper): SpotLDAModel = {
 
-    import sparkSession.implicits._
-
-    val docWordCountCache = docWordCount.cache()
-
-    // Forcing an action to cache results.
-    docWordCountCache.count()
-
-    // Create word Map Word,Index for further usage
-    val wordDictionary: Map[String, Int] = {
-      val words = docWordCountCache
-        .map({ case SpotLDAInput(doc, word, count) => word })
-        .distinct
-        .collect
-      words.zipWithIndex.toMap
-    }
-
-    val documentDictionary: DataFrame = docWordCountCache
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex
-      .toDF(DocumentName, DocumentNumber)
-      .cache
 
     // Structure corpus so that the index is the docID, values are the vectors of word occurrences in that doc
-    val ldaCorpus: RDD[(Long, Vector)] =
-      formatSparkLDAInput(docWordCountCache,
-        documentDictionary,
-        wordDictionary,
-        sparkSession)
-
-    docWordCountCache.unpersist()
+    val ldaCorpus: RDD[(Long, Vector)] = helper.formattedCorpus
 
     // Instantiate optimizer based on input
     val ldaOptimizer = ldaOptimizerOption match {
@@ -121,162 +84,35 @@
         .setBeta(ldaBeta)
         .setOptimizer(ldaOptimizer)
 
-    // If caller does not provide seed to lda, ie. ldaSeed is empty, lda is seeded automatically set to hash value of class name
-
+    // If caller does not provide seed to lda, ie. ldaSeed is empty,
+    // lda is seeded automatically set to hash value of class name
     if (ldaSeed.nonEmpty) {
       lda.setSeed(ldaSeed.get)
     }
 
-    val (wordTopicMat, docTopicDist) = ldaOptimizer match {
-      case _: EMLDAOptimizer => {
-        val ldaModel = lda.run(ldaCorpus).asInstanceOf[DistributedLDAModel]
+    val model: LDAModel = lda.run(ldaCorpus)
 
-        // Get word topic mix, from Spark documentation:
-        // Inferred topics, where each topic is represented by a distribution over terms.
-        // This is a matrix of size vocabSize x k, where each column is a topic.
-        // No guarantees are given about the ordering of the topics.
-        val wordTopicMat: Matrix = ldaModel.topicsMatrix
+    SpotLDAModel(model)
+  }
 
-        // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
-        // i is the fraction of the document which belongs to topic i
-        val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions
+  /**
+    * Load an existing model from HDFS location.
+    *
+    * @param sparkSession       the Spark session.
+    * @param location           the HDFS location for the model.
+    * @param ldaOptimizerOption LDA optimizer, em or online.
+    * @return SpotLDAModel
+    */
+  def load(sparkSession: SparkSession, location: String, ldaOptimizerOption: String): SpotLDAModel = {
+    val sparkContext: SparkContext = sparkSession.sparkContext
 
-        (wordTopicMat, docTopicDist)
-
-      }
-
-      case _: OnlineLDAOptimizer => {
-        val ldaModel = lda.run(ldaCorpus).asInstanceOf[LocalLDAModel]
-
-        // Get word topic mix, from Spark documentation:
-        // Inferred topics, where each topic is represented by a distribution over terms.
-        // This is a matrix of size vocabSize x k, where each column is a topic.
-        // No guarantees are given about the ordering of the topics.
-        val wordTopicMat: Matrix = ldaModel.topicsMatrix
-
-        // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
-        // i is the fraction of the document which belongs to topic i
-        val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions(ldaCorpus)
-
-        (wordTopicMat, docTopicDist)
-
-      }
-
+    val model = ldaOptimizerOption match {
+      case "em" => DistributedLDAModel.load(sparkContext, location)
+      case "online" => LocalLDAModel.load(sparkContext, location)
+      case _ => throw new IllegalArgumentException(
+        s"Invalid LDA optimizer $ldaOptimizerOption")
     }
 
-    // Create doc results from vector: convert docID back to string, convert vector of probabilities to array
-    val docToTopicMixDF =
-      formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession, precisionUtility)
-
-    documentDictionary.unpersist()
-
-    // Create word results from matrix: convert matrix to sequence, wordIDs back to strings, sequence of
-    // probabilities to array
-    val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
-    val wordResults = formatSparkLDAWordOutput(wordTopicMat, revWordMap)
-
-    // Create output object
-    SpotLDAOutput(docToTopicMixDF, wordResults)
+    SpotLDAModel(model)
   }
-
-  /**
-    * Formats input data for LDA algorithm
-    *
-    * @param docWordCount       RDD with document list and the word count for each document (corpus)
-    * @param documentDictionary DataFrame with a distinct list of documents and its id
-    * @param wordDictionary     immutable Map with distinct list of word and its id
-    * @param sparkSession       the SparkSession
-    * @return
-    */
-  def formatSparkLDAInput(docWordCount: RDD[SpotLDAInput],
-                          documentDictionary: DataFrame,
-                          wordDictionary: Map[String, Int],
-                          sparkSession: SparkSession): RDD[(Long, Vector)] = {
-
-    import sparkSession.implicits._
-
-    val getWordId = {
-      udf((word: String) => (wordDictionary(word)))
-    }
-
-    val docWordCountDF = docWordCount
-      .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
-      .toDF(DocumentName, WordName, WordNameWordCount)
-
-    // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
-    // is indexed by DocID
-    val wordCountsPerDocDF = docWordCountDF
-      .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
-      .drop(documentDictionary(DocumentName))
-      .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
-      .drop(WordName)
-
-    val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
-    = wordCountsPerDocDF
-      .select(DocumentNumber, WordNumber, WordNameWordCount)
-      .rdd
-      .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
-      .groupByKey
-
-    // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
-    val numUniqueWords = wordDictionary.size
-    val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
-      .mapValues({ case vs => Vectors.sparse(numUniqueWords, vs.toSeq) })
-
-    ldaInput
-  }
-
-  /**
-    * Format LDA output topicMatrix for spot-ml scoring
-    *
-    * @param wordTopMat LDA model topicMatrix
-    * @param wordMap    immutable Map with distinct list of word and its id
-    * @return
-    */
-  def formatSparkLDAWordOutput(wordTopMat: Matrix, wordMap: Map[Int, String]): scala.Predef.Map[String, Array[Double]] = {
-
-    // incoming word top matrix is in column-major order and the columns are unnormalized
-    val m = wordTopMat.numRows
-    val n = wordTopMat.numCols
-    val columnSums: Array[Double] = Range(0, n).map(j => (Range(0, m).map(i => wordTopMat(i, j)).sum)).toArray
-
-    val wordProbs: Seq[Array[Double]] = wordTopMat.transpose.toArray.grouped(n).toSeq
-      .map(unnormProbs => unnormProbs.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
-
-    wordProbs.zipWithIndex.map({ case (topicProbs, wordInd) => (wordMap(wordInd), topicProbs) }).toMap
-  }
-
-  /**
-    * Format LDA output topicDistribution for spot-ml scoring
-    *
-    * @param docTopDist         LDA model topicDistribution
-    * @param documentDictionary DataFrame with a distinct list of documents and its id
-    * @param sparkSession       the SparkSession
-    * @param precisionUtility   FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
-    * @return
-    */
-  def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, sparkSession: SparkSession,
-                                   precisionUtility: FloatPointPrecisionUtility):
-  DataFrame = {
-    import sparkSession.implicits._
-
-    val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
-    val documentToTopicDistributionDF = docTopDist.toDF(DocumentNumber, TopicProbabilityMix)
-
-    val documentToTopicDistributionArray = documentToTopicDistributionDF
-      .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
-      .drop(documentDictionary(DocumentNumber))
-      .drop(documentToTopicDistributionDF(DocumentNumber))
-      .select(DocumentName, TopicProbabilityMix)
-      .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
-      .selectExpr(s"$DocumentName  AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
-
-    precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
-  }
-
-  case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
-
-  case class SpotLDAOutput(docToTopicMix: DataFrame, wordResults: Map[String, Array[Double]])
-
 }
\ No newline at end of file
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 6be11e1..4e09616 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -20,11 +20,10 @@
 import org.apache.log4j.Logger
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
 import org.apache.spot.netflow.FlowSchema._
 import org.apache.spot.netflow.FlowWordCreator
 import org.apache.spot.utilities.FloatPointPrecisionUtility
@@ -36,7 +35,7 @@
   * The model uses a topic-modelling approach that:
   * 1. Simplifies netflow records into words, one word at the source IP and another (possibly different) at the
   * destination IP.
-  * 2. The netflow words about each IP are treated as collections of thes words.
+  * 2. The netflow words about each IP are treated as collections of these words.
   * 3. A topic modelling approach is used to infer a collection of "topics" that represent common profiles
   * of network traffic. These "topics" are probability distributions on words.
   * 4. Each IP has a mix of topics corresponding to its behavior.
@@ -112,7 +111,7 @@
 }
 
 /**
-  * Contains dataframe schema information as well as the train-from-dataframe routine
+  * Contains DataFrame schema information as well as the train-from-dataframe routine
   * (which is a kind of factory routine) for [[FlowSuspiciousConnectsModel]] instances.
   *
   */
@@ -127,7 +126,7 @@
     IbytField,
     IpktField))
 
-  val ModelColumns = ModelSchema.fieldNames.toList.map(col)
+  val ModelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
 
 
   def trainModel(sparkSession: SparkSession,
@@ -146,13 +145,12 @@
       config.duplicationFactor))
 
 
+    import sparkSession.implicits._
     // simplify netflow log entries into "words"
 
     val dataWithWords = totalRecords.withColumn(SourceWord, FlowWordCreator.srcWordUDF(ModelColumns: _*))
       .withColumn(DestinationWord, FlowWordCreator.dstWordUDF(ModelColumns: _*))
 
-    import sparkSession.implicits._
-
     // Aggregate per-word counts at each IP
     val srcWordCounts = dataWithWords
       .filter(dataWithWords(SourceWord).notEqual(InvalidDataHandler.WordError))
@@ -173,20 +171,19 @@
         .reduceByKey(_ + _)
         .map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
 
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipWordCounts, config.precisionUtility, sparkSession)
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
-      ipWordCounts,
-      config.topicCount,
+    val model = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
 
-    new FlowSuspiciousConnectsModel(config.topicCount,
-      ipToTopicMix,
-      wordToPerTopicProb)
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
+
+    new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
   }
 }
\ No newline at end of file
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 3ef60af..7332fe4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -25,9 +25,8 @@
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.SuspiciousConnectsScoreFunction
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
 import org.apache.spot.proxy.ProxySchema._
 import org.apache.spot.utilities._
 import org.apache.spot.utilities.data.validation.InvalidDataHandler
@@ -92,7 +91,7 @@
   */
 object ProxySuspiciousConnectsModel {
 
-  // These buckets are optimized to datasets used for training. Last bucket is of infinite size to ensure fit.
+  // These buckets are optimized to data sets used for training. Last bucket is of infinite size to ensure fit.
   // The maximum value of entropy is given by log k where k is the number of distinct categories.
   // Given that the alphabet and number of characters is finite the maximum value for entropy is upper bounded.
   // Bucket number and size can be changed to provide less/more granularity
@@ -119,8 +118,8 @@
     * for clustering in the topic model.
     *
     * @param sparkSession Spark Session
-    * @param logger       Logge object.
-    * @param config       SuspiciousConnetsArgumnetParser.Config object containg CLI arguments.
+    * @param logger       Logger object.
+    * @param config       SuspiciousConnectsArgumentParser.Config object containing CLI arguments.
     * @param inputRecords Dataframe for training data, with columns Host, Time, ReqMethod, FullURI, ResponseContentType,
     *                     UserAgent, RespCode (as defined in ProxySchema object).
     * @return ProxySuspiciousConnectsModel
@@ -130,7 +129,7 @@
                  config: SuspiciousConnectsConfig,
                  inputRecords: DataFrame): ProxySuspiciousConnectsModel = {
 
-    logger.info("training new proxy suspcious connects model")
+    logger.info("training new proxy suspicious connects model")
 
 
     val selectedRecords =
@@ -145,24 +144,24 @@
         .reduceByKey(_ + _).collect()
         .toMap
 
-    val agentToCountBC = sparkSession.sparkContext.broadcast(agentToCount)
-
     val docWordCount: RDD[SpotLDAInput] =
       getIPWordCounts(sparkSession, logger, selectedRecords, config.feedbackFile, config.duplicationFactor,
         agentToCount)
 
-    val SpotLDAOutput(ipToTopicMixDF, wordResults) = SpotLDAWrapper.runLDA(sparkSession,
-      docWordCount,
-      config.topicCount,
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(docWordCount, config.precisionUtility, sparkSession)
+
+    val model = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
 
-    new ProxySuspiciousConnectsModel(config.topicCount, ipToTopicMixDF, wordResults)
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
+
+    new ProxySuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
 
   }
 
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
index 0183027..083cfe7 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
@@ -28,7 +28,6 @@
 
   val TopDomains: Set[String] = Source.fromFile(alexaTop1MPath).getLines.map(line => {
     val parts = line.split(",")
-    val l = parts.length
     parts(1).split('.')(0)
   }).toSet
 }
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
new file mode 100644
index 0000000..93828b2
--- /dev/null
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spot.lda.SpotLDAWrapperSchema.TopicProbabilityMix
+import org.apache.spot.testutils.TestingSparkContextFlatSpec
+import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
+import org.scalatest.Matchers
+
+/**
+  * Created by rabarona on 7/17/17.
+  */
+class SpotLDAHelperTest extends TestingSparkContextFlatSpec with Matchers {
+
+  "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
+    "is the docID, values are the vectors of word occurrences in that doc" in {
+
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkLDAInput: RDD[(Long, Vector)] = spotLDAHelper.formattedCorpus
+    val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
+
+    sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(5.0, 8.0))), (2, Vectors.sparse(4, Array
+    (1), Array(2.0))), (1, Vectors.sparse(4, Array(2), Array(4.0))))
+  }
+
+  "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
+    "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+    import testImplicits._
+    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
+      docProbabilities)
+    }).collect
+
+    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
+
+    documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
+    documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
+    documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
+
+    documentProbabilities(0) shouldBe a[java.lang.Double]
+
+  }
+
+  it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
+    "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility32, sparkSession)
+
+    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+    val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+    import testImplicits._
+    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
+      docProbabilities)
+    }).collect
+
+    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
+
+    documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
+    documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
+    documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
+
+    documentProbabilities(0) shouldBe a[java.lang.Float]
+  }
+
+  "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
+    "to strings, and sequence of probabilities to array" in {
+
+    val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "23.0_7.0_7.0_4.0", 8),
+      SpotLDAInput("10.10.98.123", "80.0_7.0_7.0_4.0", 4),
+      SpotLDAInput("66.23.45.11", "333333.0_7.0_7.0_4.0", 2),
+      SpotLDAInput("192.168.1.2", "-1_23.0_7.0_7.0_4.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkWordRes = spotLDAHelper.formatTopicDistributions(testMat)
+
+    sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
+  }
+}
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
index 5c40068..ae25d89 100644
--- a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
@@ -18,25 +18,18 @@
 package org.apache.spot.lda
 
 import org.apache.log4j.{Level, LogManager}
-import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spot.lda.SpotLDAWrapper._
 import org.apache.spot.lda.SpotLDAWrapperSchema._
 import org.apache.spot.testutils.TestingSparkContextFlatSpec
 import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
 import org.scalatest.Matchers
 
-import scala.collection.immutable.Map
-
 class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
 
   "SparkLDA" should "handle an extremely unbalanced two word doc with EM optimizer" in {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -46,16 +39,20 @@
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Double]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -65,9 +62,9 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.2
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
-    val ldaMaxIterations = 20
+    val ldaMaxIterations = 100
 
     val optimizer = "em"
 
@@ -75,20 +72,24 @@
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
     val dogTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
     val catTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -98,7 +99,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  0.0009
+    val ldaAlpha = 0.0009
     val ldaBeta = 0.00001
     val ldaMaxIterations = 400
 
@@ -108,16 +109,20 @@
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Double]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -127,7 +132,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  0.0009
+    val ldaAlpha = 0.0009
     val ldaBeta = 0.00001
     val ldaMaxIterations = 400
     val optimizer = "online"
@@ -136,20 +141,25 @@
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
+
     val dogTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
     val catTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -159,7 +169,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -169,16 +179,20 @@
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0).toDouble * catTopics(0) + topicMix(1).toDouble * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0).toDouble * dogTopics(0) + topicMix(1).toDouble * dogTopics(1))) should be < 0.01
@@ -188,7 +202,7 @@
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -198,134 +212,28 @@
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
+
     val dogTopicMix: Array[Float] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
 
     val catTopicMix: Array[Float] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
   }
 
-  "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
-    "is the docID, values are the vectors of word occurrences in that doc" in {
-
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val wordDictionary = Map("333333_7.0_0.0_1.0" -> 0, "1111111_6.0_3.0_5.0" -> 1, "-1_43_7.0_2.0_6.0" -> 2,
-      "-1_80_6.0_1.0_1.0" -> 3)
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-
-    val sparkLDAInput: RDD[(Long, Vector)] = SpotLDAWrapper.formatSparkLDAInput(documentWordData,
-      documentDictionary, wordDictionary, sparkSession)
-    val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
-
-    sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(8.0, 5.0))), (2, Vectors.sparse(4, Array
-    (2), Array(2.0))), (1, Vectors.sparse(4, Array(1), Array(4.0))))
-  }
-
-  "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
-    "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
-      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
-        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
-        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
-    val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
-      FloatPointPrecisionUtility64)
-
-    import testImplicits._
-    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
-      docProbabilities)
-    }).collect
-
-    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
-
-    documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
-    documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
-    documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
-
-    documentProbabilities(0) shouldBe a[java.lang.Double]
-
-  }
-
-  it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
-    "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
-      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
-        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
-        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
-    val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
-      FloatPointPrecisionUtility32)
-
-    import testImplicits._
-    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
-      docProbabilities)
-    }).collect
-
-    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
-
-    documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
-    documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
-    documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
-
-    documentProbabilities(0) shouldBe a[java.lang.Float]
-  }
-
-  "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
-    "to strings, and sequence of probabilities to array" in {
-    val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
-
-    val wordDictionary = Map("-1_23.0_7.0_7.0_4.0" -> 3, "23.0_7.0_7.0_4.0" -> 0, "333333.0_7.0_7.0_4.0" -> 2, "80.0_7.0_7.0_4.0" -> 1)
-    val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
-    val sparkWordRes = formatSparkLDAWordOutput(testMat, revWordMap)
-
-    sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
-  }
 }
\ No newline at end of file