blob: c0d9ec4ccef84576af29f9469271d89eb6b99358 [file] [log] [blame]
package io.prediction.algorithms.graphchi.itemrec
import grizzled.slf4j.Logger
import breeze.linalg._
import com.twitter.scalding.Args
import scala.io.Source
import scala.collection.mutable.PriorityQueue
import io.prediction.algorithms.graphchi.itemrec.MatrixMarketReader
import io.prediction.commons.Config
import io.prediction.commons.modeldata.{ ItemRecScore }
/**
* Input files:
* - usersIndex.tsv (uindex uid)
* - itemsIndex.tsv (iindex iid itypes) (only recommend items in this list)
* - seen.csv (uindex, iindex) (For unseenOnly filtering)
* - ratings.mm_U.mm User x feature matrix generated by GraphChi
* - ratings.mm_V.mm Item x feature matrix generated by GraphChi
*
*/
object GraphChiModelConstructor {
/* global */
val logger = Logger(GraphChiModelConstructor.getClass)
//println(logger.isInfoEnabled)
val commonsConfig = new Config
// argument of this job
case class JobArg(
val inputDir: String,
val appid: Int,
val algoid: Int,
val evalid: Option[Int],
val modelSet: Boolean,
val unseenOnly: Boolean,
val numRecommendations: Int)
def main(cmdArgs: Array[String]) {
logger.info("Running model constructor for GraphChi ...")
logger.info(cmdArgs.mkString(","))
/* get arg */
val args = Args(cmdArgs)
val arg = JobArg(
inputDir = args("inputDir"),
appid = args("appid").toInt,
algoid = args("algoid").toInt,
evalid = args.optional("evalid") map (x => x.toInt),
modelSet = args("modelSet").toBoolean,
unseenOnly = args("unseenOnly").toBoolean,
numRecommendations = args("numRecommendations").toInt
)
/* run job */
modelCon(arg)
cleanUp(arg)
}
def modelCon(arg: JobArg) = {
// NOTE: if OFFLINE_EVAL, write to training modeldata and use evalid as appid
val OFFLINE_EVAL = (arg.evalid != None)
val modeldataDb = if (!OFFLINE_EVAL)
commonsConfig.getModeldataItemRecScores
else
commonsConfig.getModeldataTrainingItemRecScores
val appid = if (OFFLINE_EVAL) arg.evalid.get else arg.appid
// user index file
// uindex -> uid
val usersMap: Map[Int, String] = Source.fromFile(s"${arg.inputDir}usersIndex.tsv").getLines()
.map[(Int, String)] { line =>
val (uindex, uid) = try {
val data = line.split("\t")
(data(0).toInt, data(1))
} catch {
case e: Exception => {
throw new RuntimeException(s"Cannot get user index and uid in line: ${line}. ${e}")
}
}
(uindex, uid)
}.toMap
case class ItemData(
val iid: String,
val itypes: Seq[String])
// item index file (iindex iid itypes)
// iindex -> ItemData
val itemsMap: Map[Int, ItemData] = Source.fromFile(s"${arg.inputDir}itemsIndex.tsv")
.getLines()
.map[(Int, ItemData)] { line =>
val (iindex, item) = try {
val fields = line.split("\t")
val itemData = ItemData(
iid = fields(1),
itypes = fields(2).split(",").toList
)
(fields(0).toInt, itemData)
} catch {
case e: Exception => {
throw new RuntimeException(s"Cannot get item info in line: ${line}. ${e}")
}
}
(iindex, item)
}.toMap
// ratings file (for unseen filtering)
val seenSet: Map[Int, Set[Int]] = if (arg.unseenOnly) {
Source.fromFile(s"${arg.inputDir}seen.csv")
.getLines()
.map[(Int, Int)] { line =>
val (u, i) = try {
val fields = line.split(",")
// u, i
(fields(0).toInt, fields(1).toInt)
} catch {
case e: Exception => throw new RuntimeException(s"Cannot get user and item index from this line: ${line}. ${e}")
}
(u, i)
}.toSeq.groupBy(_._1)
.mapValues(_.map(_._2).toSet)
} else {
Map() // empty map
}
// feature x user matrix
val userMatrix = MatrixMarketReader.readDense(s"${arg.inputDir}ratings.mm_U.mm")
// feature x item matrix
val itemMatrix = MatrixMarketReader.readDense(s"${arg.inputDir}ratings.mm_V.mm")
val allUindex = for (uindex <- 1 to userMatrix.cols if usersMap.contains(uindex)) yield (uindex, userMatrix(::, uindex - 1), seenSet.getOrElse(uindex, Set()))
val validIindex = for (iindex <- 1 to itemMatrix.cols if validItemFilter(true, iindex, itemsMap)) yield (iindex)
val allScores = allUindex.par
.foreach { fields =>
val (uindex, userVector, seenItemSet) = fields
val q = new TopNQueue[(Int, Double)](arg.numRecommendations)(ScoreOrdering.reverse)
val candidateIindex = validIindex.toIterator
.filter(iindex => unseenItemFilter(arg.unseenOnly, iindex, seenItemSet))
while (candidateIindex.hasNext) {
val iindex = candidateIindex.next()
val score = userVector dot itemMatrix(::, iindex - 1)
q.add((iindex, score))
}
val topScores = q.toSeq
modeldataDb.insert(ItemRecScore(
uid = usersMap(uindex),
iids = topScores.map(x => itemsMap(x._1).iid),
scores = topScores.map(_._2),
itypes = topScores.map(x => itemsMap(x._1).itypes),
appid = appid,
algoid = arg.algoid,
modelset = arg.modelSet))
}
}
def unseenItemFilter(enable: Boolean, iindex: Int, seenSet: Set[Int]): Boolean = {
if (enable) (!seenSet(iindex)) else true
}
def validItemFilter(enable: Boolean, iindex: Int, validMap: Map[Int, Any]): Boolean = {
if (enable) validMap.contains(iindex) else true
}
def cleanUp(arg: JobArg) = {
}
object ScoreOrdering extends Ordering[(Int, Double)] {
override def compare(a: (Int, Double), b: (Int, Double)) = a._2 compare b._2
}
def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
val q = PriorityQueue()
for (x <- s) {
if (q.size < n)
q.enqueue(x)
else {
// q is full
if (ord.compare(x, q.head) < 0) {
q.dequeue()
q.enqueue(x)
}
}
}
q.dequeueAll.toSeq.reverse
}
class TopNQueue[T](val n: Int)(implicit ord: Ordering[T]) {
val q = PriorityQueue[T]()
def add(x: T) = {
if (q.size < n)
q.enqueue(x)
else {
// q is full
if (ord.compare(x, q.head) < 0) {
q.dequeue()
q.enqueue(x)
}
}
}
def toSeq[T] = {
q.dequeueAll.toSeq.reverse
}
}
}