blob: 2f9c066c9fd13e89a9122a8e753c4c507d0251cb [file] [log] [blame]
package io.prediction.commons.modeldata.mongodb
import io.prediction.commons.Config
import io.prediction.commons.MongoUtils._
import io.prediction.commons.modeldata.{ ItemRecScore, ItemRecScores }
import io.prediction.commons.settings.{ Algo, App, OfflineEval }
import com.mongodb.casbah.Imports._
/** MongoDB implementation of ItemRecScores. */
class MongoItemRecScores(cfg: Config, db: MongoDB) extends ItemRecScores with MongoModelData {
val config = cfg
val mongodb = db
/** Indices and hints. */
val queryIndex = MongoDBObject("uid" -> 1)
def getByUid(uid: String)(implicit app: App, algo: Algo, offlineEval: Option[OfflineEval] = None): Option[ItemRecScore] = {
val modelset = offlineEval map { _ => false } getOrElse algo.modelset
val itemRecScoreColl = db(collectionName(, modelset))
itemRecScoreColl.ensureIndex(queryIndex) // not needed here. it's called in after(), just safety measure in case after() is not called
itemRecScoreColl.findOne(MongoDBObject("uid" -> idWithAppid(, uid))).map(dbObjToItemRecScore(_,
def getTopNIids(uid: String, n: Int, itypes: Option[Seq[String]])(implicit app: App, algo: Algo, offlineEval: Option[OfflineEval] = None): Iterator[String] = {
val modelset = offlineEval map { _ => false } getOrElse algo.modelset
val itemRecScoreColl = db(collectionName(, modelset))
itemRecScoreColl.ensureIndex(queryIndex) // not needed here. it's called in after(), just safety measure in case after() is not called
itemRecScoreColl.findOne(MongoDBObject("uid" -> idWithAppid(, uid))).map(dbObjToItemRecScore(_, {
x: ItemRecScore =>
val iids = { s =>
val iidsAndItypes = // List( (iid, Set(itypes of this iid)), ... )
val itypesSet: Set[String] = s.toSet // query itypes Set
val itypesSetSize = itypesSet.size
iidsAndItypes.filter {
case (iid, iiditypes) =>
// if there are some elements in s existing in iiditypes, then s.diff(iiditypes) size will be < original size of s
// it means itypes match the item
(itypesSet.diff(iiditypes).size < itypesSetSize)
}.map(_._1) // only return the iid
}.getOrElse {
val topNIids = if (n == 0) iids else iids.take(n)
def getTopNIidsAndScores(uid: String, n: Int,
itypes: Option[Seq[String]])(implicit app: App, algo: Algo,
offlineEval: Option[OfflineEval] = None): Seq[(String, Double)] = {
val modelset = offlineEval map { _ => false } getOrElse algo.modelset
val itemRecScoreColl = db(collectionName(, modelset))
"uid" -> idWithAppid(, uid))).map(
dbObjToItemRecScore(_, {
x: ItemRecScore =>
val iids = { s =>
val zippedIids = (x.iids, x.scores, x.itypes).zipped.toSeq
val itypesSet: Set[String] = s.toSet // query itypes Set
val itypesSetSize = itypesSet.size
zippedIids.filter { z =>
// if there are some elements in s existing in iiditypes, then
// s.diff(iiditypes) size will be < original size of s
// it means itypes match the item
(itypesSet.diff(z._3.toSet).size < itypesSetSize)
}.map(z => (z._1, z._2)) // only return the iid
}.getOrElse {
if (n == 0) iids else iids.take(n)
} getOrElse {
Seq[(String, Double)]()
def insert(itemRecScore: ItemRecScore) = {
val id = new ObjectId
val itemRecObj = MongoDBObject(
"_id" -> id,
"uid" -> idWithAppid(itemRecScore.appid, itemRecScore.uid),
"iids" -> => idWithAppid(itemRecScore.appid, i)),
"scores" -> itemRecScore.scores,
"itypes" -> itemRecScore.itypes,
"algoid" -> itemRecScore.algoid,
"modelset" -> itemRecScore.modelset
db(collectionName(itemRecScore.algoid, itemRecScore.modelset))
itemRecScore.copy(id = Some(id))
* Insert ItemRecScore(s) and return them with real IDs.
* This method uses the Algo ID and model set of the first
* ItemRecScore in the sequence as the collection name.
def insert(itemRecScores: Seq[ItemRecScore]) = {
val size = itemRecScores.size
if (size == 0) Seq[ItemRecScore]()
else {
val algoid = itemRecScores(0).algoid
val modelset = itemRecScores(0).modelset
val ids = Seq.fill(itemRecScores.size)(new ObjectId)
val itemRecObjsAndIds =
val itemRecObjs = => MongoDBObject(
"_id" -> t._2,
"uid" -> idWithAppid(t._1.appid, t._1.uid),
"iids" -> => idWithAppid(t._1.appid, i)),
"scores" -> t._1.scores,
"itypes" -> t._1.itypes,
"algoid" -> t._1.algoid,
"modelset" -> t._1.modelset))
db(collectionName(algoid, modelset)).insert(itemRecObjs: _*) => t._1.copy(id = Some(t._2)))
def deleteByAlgoid(algoid: Int) = {
db(collectionName(algoid, true)).drop()
db(collectionName(algoid, false)).drop()
def deleteByAlgoidAndModelset(algoid: Int, modelset: Boolean) = {
db(collectionName(algoid, modelset)).drop()
def existByAlgo(algo: Algo) = {
db.collectionExists(collectionName(, algo.modelset)) && db(collectionName(, algo.modelset)).find().hasNext
override def after(algoid: Int, modelset: Boolean) = {
val coll = db(collectionName(algoid, modelset))
/** Private mapping function to map DB Object to ItemRecScore object */
private def dbObjToItemRecScore(dbObj: DBObject, appid: Int) = {
uid =[String]("uid").drop(appid.toString.length + 1),
iids = mongoDbListToListOfString([MongoDBList]("iids")).map(_.drop(appid.toString.length + 1)),
scores = mongoDbListToListOfDouble([MongoDBList]("scores")),
itypes = mongoDbListToListofListOfString([MongoDBList]("itypes")),
appid = appid,
algoid =[Int]("algoid"),
modelset =[Boolean]("modelset"),
id = Some([ObjectId]("_id"))
class MongoItemRecScoreIterator(it: MongoCursor, appid: Int) extends Iterator[ItemRecScore] {
def hasNext = it.hasNext
def next = dbObjToItemRecScore(, appid)