import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.immutable.Map
* Spark LDA implementation
* Contains routines for LDA using Scala Spark implementation from mllib
* 1. Creates list of unique documents, words and model based on those two
* 2. Processes the model using Spark LDA
* 3. Reads Spark LDA results: Topic distributions per document (docTopicDist) and word distributions per topic (wordTopicMat)
* 4. Convert wordTopicMat (Matrix) and docTopicDist (RDD) to appropriate formats (maps) originally specified in LDA-C
object SpotLDAWrapper {
def runLDA(sparkContext: SparkContext,
sqlContext: SQLContext,
docWordCount: RDD[SpotLDAInput],
topicCount: Int,
logger: Logger,
ldaSeed: Option[Long],
ldaAlpha: Double,
ldaBeta: Double,
ldaOptimizerOption: String,
maxIterations: Int,
precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
import sqlContext.implicits._
val docWordCountCache = docWordCount.cache()
// Forcing an action to cache results.
// Create word Map Word,Index for further usage
val wordDictionary: Map[String, Int] = {
val words = docWordCountCache
.map({ case SpotLDAInput(doc, word, count) => word })
val documentDictionary: DataFrame = docWordCountCache
.map({ case SpotLDAInput(doc, word, count) => doc })
.toDF(DocumentName, DocumentNumber)
// Structure corpus so that the index is the docID, values are the vectors of word occurrences in that doc
val ldaCorpus: RDD[(Long, Vector)] =
// Instantiate optimizer based on input
val ldaOptimizer = ldaOptimizerOption match {
case "em" => new EMLDAOptimizer
case "online" => new OnlineLDAOptimizer().setOptimizeDocConcentration(true).setMiniBatchFraction({
val corpusSize = ldaCorpus.count()
if (corpusSize < 2) 0.75
else (0.05 + 1) / corpusSize
case _ => throw new IllegalArgumentException(
s"Invalid LDA optimizer $ldaOptimizerOption")
}"Running Spark LDA with params alpha = $ldaAlpha beta = $ldaBeta " +
s"Max iterations = $maxIterations Optimizer = $ldaOptimizerOption")
// Set LDA params from input args
val lda =
new LDA()
// If caller does not provide seed to lda, ie. ldaSeed is empty, lda is seeded automatically set to hash value of class name
if (ldaSeed.nonEmpty) {
val (wordTopicMat, docTopicDist) = ldaOptimizer match {
case _: EMLDAOptimizer => {
val ldaModel =[DistributedLDAModel]
// Get word topic mix, from Spark documentation:
// Inferred topics, where each topic is represented by a distribution over terms.
// This is a matrix of size vocabSize x k, where each column is a topic.
// No guarantees are given about the ordering of the topics.
val wordTopicMat: Matrix = ldaModel.topicsMatrix
// Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
// i is the fraction of the document which belongs to topic i
val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions
(wordTopicMat, docTopicDist)
case _: OnlineLDAOptimizer => {
val ldaModel =[LocalLDAModel]
// Get word topic mix, from Spark documentation:
// Inferred topics, where each topic is represented by a distribution over terms.
// This is a matrix of size vocabSize x k, where each column is a topic.
// No guarantees are given about the ordering of the topics.
val wordTopicMat: Matrix = ldaModel.topicsMatrix
// Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
// i is the fraction of the document which belongs to topic i
val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions(ldaCorpus)
(wordTopicMat, docTopicDist)
// Create doc results from vector: convert docID back to string, convert vector of probabilities to array
val docToTopicMixDF =
formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sqlContext, precisionUtility)
// Create word results from matrix: convert matrix to sequence, wordIDs back to strings, sequence of
// probabilities to array
val revWordMap: Map[Int, String] =
val wordResults = formatSparkLDAWordOutput(wordTopicMat, revWordMap)
// Create output object
SpotLDAOutput(docToTopicMixDF, wordResults)
def formatSparkLDAInput(docWordCount: RDD[SpotLDAInput],
documentDictionary: DataFrame,
wordDictionary: Map[String, Int],
sqlContext: SQLContext): RDD[(Long, Vector)] = {
import sqlContext.implicits._
val getWordId = {
udf((word: String) => (wordDictionary(word)))
val docWordCountDF = docWordCount
.map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
.toDF(DocumentName, WordName, WordNameWordCount)
// Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
// is indexed by DocID
val wordCountsPerDocDF = docWordCountDF
.join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
.withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
= wordCountsPerDocDF
.select(DocumentNumber, WordNumber, WordNameWordCount)
.map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
// Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
val numUniqueWords = wordDictionary.size
val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
.mapValues({ case vs => Vectors.sparse(numUniqueWords, vs.toSeq) })
def formatSparkLDAWordOutput(wordTopMat: Matrix, wordMap: Map[Int, String]): scala.Predef.Map[String, Array[Double]] = {
// incoming word top matrix is in column-major order and the columns are unnormalized
val m = wordTopMat.numRows
val n = wordTopMat.numCols
val columnSums: Array[Double] = Range(0, n).map(j => (Range(0, m).map(i => wordTopMat(i, j)).sum)).toArray
val wordProbs: Seq[Array[Double]] = wordTopMat.transpose.toArray.grouped(n).toSeq
.map(unnormProbs =>{ case (u, j) => u / columnSums(j) })){ case (topicProbs, wordInd) => (wordMap(wordInd), topicProbs) }).toMap
def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, sqlContext:
SQLContext, precisionUtility: FloatPointPrecisionUtility):
DataFrame = {
import sqlContext.implicits._
val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
val documentToTopicDistributionDF = docTopDist.toDF(DocumentNumber, TopicProbabilityMix)
val documentToTopicDistributionArray = documentToTopicDistributionDF
.join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
.select(DocumentName, TopicProbabilityMix)
.withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
.selectExpr(s"$DocumentName AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
case class SpotLDAOutput(docToTopicMix: DataFrame, wordResults: Map[String, Array[Double]])