blob: 122e8eddf7b4c8df83a8aa1c8e1b4be90a7cd59e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spot.lda
import org.apache.log4j.Logger
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spot.lda.SpotLDAWrapperSchema._
import org.apache.spot.utilities.FloatPointPrecisionUtility
import scala.collection.immutable.Map
/**
* Spark LDA implementation
* Contains routines for LDA using Scala Spark implementation from mllib
* 1. Creates list of unique documents, words and model based on those two
* 2. Processes the model using Spark LDA
* 3. Reads Spark LDA results: Topic distributions per document (docTopicDist) and word distributions per topic (wordTopicMat)
* 4. Convert wordTopicMat (Matrix) and docTopicDist (RDD) to appropriate formats (maps) originally specified in LDA-C
*/
object SpotLDAWrapper {
/**
* Runs Spark LDA and returns a new model.
*
* @param sparkSession the SparkSession
* @param docWordCount RDD with document list and the word count for each document (corpus)
* @param topicCount number of topics to find
* @param logger application logger
* @param ldaSeed LDA seed
* @param ldaAlpha document concentration
* @param ldaBeta topic concentration
* @param ldaOptimizerOption LDA optimizer, em or online
* @param maxIterations maximum number of iterations for the optimizer
* @param precisionUtility FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
* @return
*/
def runLDA(sparkSession: SparkSession,
docWordCount: RDD[SpotLDAInput],
topicCount: Int,
logger: Logger,
ldaSeed: Option[Long],
ldaAlpha: Double,
ldaBeta: Double,
ldaOptimizerOption: String,
maxIterations: Int,
precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
import sparkSession.implicits._
val docWordCountCache = docWordCount.cache()
// Forcing an action to cache results.
docWordCountCache.count()
// Create word Map Word,Index for further usage
val wordDictionary: Map[String, Int] = {
val words = docWordCountCache
.map({ case SpotLDAInput(doc, word, count) => word })
.distinct
.collect
words.zipWithIndex.toMap
}
val documentDictionary: DataFrame = docWordCountCache
.map({ case SpotLDAInput(doc, word, count) => doc })
.distinct
.zipWithIndex
.toDF(DocumentName, DocumentNumber)
.cache
// Structure corpus so that the index is the docID, values are the vectors of word occurrences in that doc
val ldaCorpus: RDD[(Long, Vector)] =
formatSparkLDAInput(docWordCountCache,
documentDictionary,
wordDictionary,
sparkSession)
docWordCountCache.unpersist()
// Instantiate optimizer based on input
val ldaOptimizer = ldaOptimizerOption match {
case "em" => new EMLDAOptimizer
case "online" => new OnlineLDAOptimizer().setOptimizeDocConcentration(true).setMiniBatchFraction({
val corpusSize = ldaCorpus.count()
if (corpusSize < 2) 0.75
else (0.05 + 1) / corpusSize
})
case _ => throw new IllegalArgumentException(
s"Invalid LDA optimizer $ldaOptimizerOption")
}
logger.info(s"Running Spark LDA with params alpha = $ldaAlpha beta = $ldaBeta " +
s"Max iterations = $maxIterations Optimizer = $ldaOptimizerOption")
// Set LDA params from input args
val lda =
new LDA()
.setK(topicCount)
.setMaxIterations(maxIterations)
.setAlpha(ldaAlpha)
.setBeta(ldaBeta)
.setOptimizer(ldaOptimizer)
// If caller does not provide seed to lda, ie. ldaSeed is empty, lda is seeded automatically set to hash value of class name
if (ldaSeed.nonEmpty) {
lda.setSeed(ldaSeed.get)
}
val (wordTopicMat, docTopicDist) = ldaOptimizer match {
case _: EMLDAOptimizer => {
val ldaModel = lda.run(ldaCorpus).asInstanceOf[DistributedLDAModel]
// Get word topic mix, from Spark documentation:
// Inferred topics, where each topic is represented by a distribution over terms.
// This is a matrix of size vocabSize x k, where each column is a topic.
// No guarantees are given about the ordering of the topics.
val wordTopicMat: Matrix = ldaModel.topicsMatrix
// Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
// i is the fraction of the document which belongs to topic i
val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions
(wordTopicMat, docTopicDist)
}
case _: OnlineLDAOptimizer => {
val ldaModel = lda.run(ldaCorpus).asInstanceOf[LocalLDAModel]
// Get word topic mix, from Spark documentation:
// Inferred topics, where each topic is represented by a distribution over terms.
// This is a matrix of size vocabSize x k, where each column is a topic.
// No guarantees are given about the ordering of the topics.
val wordTopicMat: Matrix = ldaModel.topicsMatrix
// Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
// i is the fraction of the document which belongs to topic i
val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions(ldaCorpus)
(wordTopicMat, docTopicDist)
}
}
// Create doc results from vector: convert docID back to string, convert vector of probabilities to array
val docToTopicMixDF =
formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession, precisionUtility)
documentDictionary.unpersist()
// Create word results from matrix: convert matrix to sequence, wordIDs back to strings, sequence of
// probabilities to array
val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
val wordResults = formatSparkLDAWordOutput(wordTopicMat, revWordMap)
// Create output object
SpotLDAOutput(docToTopicMixDF, wordResults)
}
/**
* Formats input data for LDA algorithm
*
* @param docWordCount RDD with document list and the word count for each document (corpus)
* @param documentDictionary DataFrame with a distinct list of documents and its id
* @param wordDictionary immutable Map with distinct list of word and its id
* @param sparkSession the SparkSession
* @return
*/
def formatSparkLDAInput(docWordCount: RDD[SpotLDAInput],
documentDictionary: DataFrame,
wordDictionary: Map[String, Int],
sparkSession: SparkSession): RDD[(Long, Vector)] = {
import sparkSession.implicits._
val getWordId = {
udf((word: String) => (wordDictionary(word)))
}
val docWordCountDF = docWordCount
.map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
.toDF(DocumentName, WordName, WordNameWordCount)
// Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
// is indexed by DocID
val wordCountsPerDocDF = docWordCountDF
.join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
.drop(documentDictionary(DocumentName))
.withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
.drop(WordName)
val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
= wordCountsPerDocDF
.select(DocumentNumber, WordNumber, WordNameWordCount)
.rdd
.map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
.groupByKey
// Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
val numUniqueWords = wordDictionary.size
val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
.mapValues({ case vs => Vectors.sparse(numUniqueWords, vs.toSeq) })
ldaInput
}
/**
* Format LDA output topicMatrix for spot-ml scoring
*
* @param wordTopMat LDA model topicMatrix
* @param wordMap immutable Map with distinct list of word and its id
* @return
*/
def formatSparkLDAWordOutput(wordTopMat: Matrix, wordMap: Map[Int, String]): scala.Predef.Map[String, Array[Double]] = {
// incoming word top matrix is in column-major order and the columns are unnormalized
val m = wordTopMat.numRows
val n = wordTopMat.numCols
val columnSums: Array[Double] = Range(0, n).map(j => (Range(0, m).map(i => wordTopMat(i, j)).sum)).toArray
val wordProbs: Seq[Array[Double]] = wordTopMat.transpose.toArray.grouped(n).toSeq
.map(unnormProbs => unnormProbs.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
wordProbs.zipWithIndex.map({ case (topicProbs, wordInd) => (wordMap(wordInd), topicProbs) }).toMap
}
/**
* Format LDA output topicDistribution for spot-ml scoring
*
* @param docTopDist LDA model topicDistribution
* @param documentDictionary DataFrame with a distinct list of documents and its id
* @param sparkSession the SparkSession
* @param precisionUtility FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
* @return
*/
def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, sparkSession: SparkSession,
precisionUtility: FloatPointPrecisionUtility):
DataFrame = {
import sparkSession.implicits._
val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
val documentToTopicDistributionDF = docTopDist.toDF(DocumentNumber, TopicProbabilityMix)
val documentToTopicDistributionArray = documentToTopicDistributionDF
.join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
.drop(documentDictionary(DocumentNumber))
.drop(documentToTopicDistributionDF(DocumentNumber))
.select(DocumentName, TopicProbabilityMix)
.withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
.selectExpr(s"$DocumentName AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
}
case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
case class SpotLDAOutput(docToTopicMix: DataFrame, wordResults: Map[String, Array[Double]])
}