blob: 8db754ef2f173f4234d6bdc7e5ac371fb6514577 [file] [log] [blame]
package org.template.similarproduct
import io.prediction.controller.PAlgorithm
import io.prediction.controller.Params
import io.prediction.controller.IPersistentModel
import io.prediction.controller.IPersistentModelLoader
import io.prediction.data.storage.BiMap
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.distributed.MatrixEntry
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import grizzled.slf4j.Logger
import scala.collection.mutable.PriorityQueue
case class DIMSUMAlgorithmParams(threshold: Double) extends Params
class DIMSUMModel(
val similarities: RDD[(Int, SparseVector)],
val itemStringIntMap: BiMap[String, Int],
val items: Map[Int, Item]
) extends IPersistentModel[DIMSUMAlgorithmParams] {
@transient lazy val itemIntStringMap = itemStringIntMap.inverse
def save(id: String, params: DIMSUMAlgorithmParams,
sc: SparkContext): Boolean = {
similarities.saveAsObjectFile(s"/tmp/${id}/similarities")
sc.parallelize(Seq(itemStringIntMap))
.saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
sc.parallelize(Seq(items))
.saveAsObjectFile(s"/tmp/${id}/items")
true
}
override def toString = {
s"similarities: [${similarities.count()}]" +
s"(${similarities.take(2).toList}...)" +
s" itemStringIntMap: [${itemStringIntMap.size}]" +
s"(${itemStringIntMap.take(2).toString}...)]" +
s" items: [${items.size}]" +
s"(${items.take(2).toString}...)]"
}
}
object DIMSUMModel
extends IPersistentModelLoader[DIMSUMAlgorithmParams, DIMSUMModel] {
def apply(id: String, params: DIMSUMAlgorithmParams,
sc: Option[SparkContext]) = {
new DIMSUMModel(
similarities = sc.get.objectFile(s"/tmp/${id}/similarities"),
itemStringIntMap = sc.get
.objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first,
items = sc.get
.objectFile[Map[Int, Item]](s"/tmp/${id}/items").first)
}
}
class DIMSUMAlgorithm(val ap: DIMSUMAlgorithmParams)
extends PAlgorithm[PreparedData, DIMSUMModel, Query, PredictedResult] {
@transient lazy val logger = Logger[this.type]
def train(data: PreparedData): DIMSUMModel = {
// create User and item's String ID to integer index BiMap
val userStringIntMap = BiMap.stringInt(data.users.keys)
val itemStringIntMap = BiMap.stringInt(data.items.keys)
// collect Item as Map and convert ID to Int index
val items: Map[Int, Item] = data.items.map { case (id, item) =>
(itemStringIntMap(id), item)
}.collectAsMap.toMap
val itemCount = items.size
// each row is a sparse vector of rated items by this user
val rows: RDD[Vector] = data.viewEvents
.map { r =>
// Convert user and item String IDs to Int index for MLlib
val uindex = userStringIntMap.getOrElse(r.user, -1)
val iindex = itemStringIntMap.getOrElse(r.item, -1)
if (uindex == -1)
logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+ " to Int index.")
if (iindex == -1)
logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+ " to Int index.")
(uindex, (iindex, 1.0))
}.filter { case (uindex, (iindex, v)) =>
// keep events with valid user and item index
(uindex != -1) && (iindex != -1)
}.groupByKey().map { case (u, ir) =>
// de-duplicate if user has multiple events on same item
val irDedup: Map[Int, Double] = ir.groupBy(_._1) // group By item index
.map { case (i, irGroup) =>
// same item index group of (item index, rating value) tuple
val r = irGroup.reduce { (a, b) =>
// Simply keep one copy.
a
// You may modify here to reduce same item tuple differently,
// such as summing all values:
//(a._1, (a._2 + b._2))
}
(i, r._2)
}
// NOTE: index array must be strictly increasing for Sparse Vector
val irSorted = irDedup.toArray.sortBy(_._1)
val indexes = irSorted.map(_._1)
val values = irSorted.map(_._2)
Vectors.sparse(itemCount, indexes, values)
}
val mat = new RowMatrix(rows)
val scores = mat.columnSimilarities(ap.threshold)
val reversedEntries: RDD[MatrixEntry] = scores.entries
.map(e => new MatrixEntry(e.j, e.i, e.value))
val combined = new CoordinateMatrix(scores.entries.union(reversedEntries))
val similarities = combined.toIndexedRowMatrix.rows
.map( row => (row.index.toInt, row.vector.asInstanceOf[SparseVector]))
new DIMSUMModel(
similarities = similarities,
itemStringIntMap = itemStringIntMap,
items = items
)
}
def predict(model: DIMSUMModel, query: Query): PredictedResult = {
// convert the white and black list items to Int index
val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
set.map(model.itemStringIntMap.get(_)).flatten
)
val blackList: Option[Set[Int]] = query.blackList.map ( set =>
set.map(model.itemStringIntMap.get(_)).flatten
)
val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
.flatten.toSet
val indexScores = query.items.flatMap { iid =>
model.itemStringIntMap.get(iid).map { itemInt =>
val simsSeq = model.similarities.lookup(itemInt)
if (simsSeq.isEmpty) {
logger.info(s"No similar items found for ${iid}.")
Array.empty[(Int, Double)]
} else {
val sims = simsSeq.head
sims.indices.zip(sims.values).filter { case (i, v) =>
whiteList.map(_.contains(i)).getOrElse(true) &&
blackList.map(!_.contains(i)).getOrElse(true) &&
// discard items in query as well
(!queryList.contains(i)) &&
// filter categories
query.categories.map { cat =>
model.items(i).categories.map { itemCat =>
// keep this item if has ovelap categories with the query
!(itemCat.toSet.intersect(cat).isEmpty)
}.getOrElse(false) // discard this item if it has no categories
}.getOrElse(true)
}
}
}.getOrElse {
logger.info(s"No similar items for unknown item ${iid}.")
Array.empty[(Int, Double)]
}
}
val aggregatedScores = indexScores.groupBy(_._1)
.mapValues(_.foldLeft[Double](0)( (b,a) => b + a._2))
.toList
val ord = Ordering.by[(Int, Double), Double](_._2).reverse
val itemScores = getTopN(aggregatedScores, query.num)(ord)
.map{ case (i, s) =>
new ItemScore(
item = model.itemIntStringMap(i),
score = s
)
}.toArray
new PredictedResult(itemScores)
}
private
def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
val q = PriorityQueue()
for (x <- s) {
if (q.size < n)
q.enqueue(x)
else {
// q is full
if (ord.compare(x, q.head) < 0) {
q.dequeue()
q.enqueue(x)
}
}
}
q.dequeueAll.toSeq.reverse
}
}