package io.prediction.algorithms.scalding.mahout.itemrec
import com.twitter.scalding._
import io.prediction.commons.filepath.{ DataFile, AlgoFile }
import io.prediction.commons.scalding.modeldata.ItemRecScores
import cascading.pipe.joiner.LeftJoin
* Source:
* Sink:
* Description:
* Required args:
* --dbType: <string> modeldata DB type (eg. mongodb) (see --dbHost, --dbPort)
* --dbName: <string> (eg. predictionio_modeldata)
* --hdfsRoot: <string>. Root directory of the HDFS
* --appid: <int>
* --engineid: <int>
* --algoid: <int>
* --modelSet: <boolean> (true/false). flag to indicate which set
* --numRecommendations: <int>. number of recommendations to be generated
* Optionsl args:
* --dbHost: <string> (eg. "")
* --dbPort: <int> (eg. 27017)
* --evalid: <int>. Offline Evaluation if evalid is specified
* --debug: <String>. "test" - for testing purpose
* Example:
class ModelConstructor(args: Args) extends Job(args) {
* parse args
val dbTypeArg = args("dbType")
val dbNameArg = args("dbName")
val dbHostArg = args.list("dbHost")
val dbPortArg = args.list("dbPort") map (x => x.toInt)
val hdfsRootArg = args("hdfsRoot")
val appidArg = args("appid").toInt
val engineidArg = args("engineid").toInt
val algoidArg = args("algoid").toInt
val evalidArg = args.optional("evalid") map (x => x.toInt)
val OFFLINE_EVAL = (evalidArg != None) // offline eval mode
val debugArg = args.list("debug")
val DEBUG_TEST = debugArg.contains("test") // test mode
val modelSetArg = args("modelSet").toBoolean
val numRecommendationsArg = args("numRecommendations").toInt
* source
val predicted = Tsv(AlgoFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "predicted.tsv"), ('uindex, 'predicted)).read
val itemsIndex = Tsv(DataFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "itemsIndex.tsv")).read
.mapTo((0, 1, 2, 3, 4) -> ('iindexI, 'iidI, 'itypesI, 'starttimeI, 'endtimeI)) { fields: (String, String, String, Long, String) =>
val (iindex, iid, itypes, starttime, endtime) = fields // itypes are comma-separated String
val endtimeOpt: Option[Long] = endtime match {
case "PIO_NONE" => None
case x: String => {
try {
} catch {
case e: Exception => {
assert(false, s"Failed to convert ${x} to Long. Exception: " + e)
(iindex, iid, itypes.split(",").toList, starttime, endtimeOpt)
val usersIndex = Tsv(DataFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "usersIndex.tsv")).read
.mapTo((0, 1) -> ('uindexU, 'uidU)) { fields: (String, String) =>
* sink
val itemRecScoresSink = ItemRecScores(dbType = dbTypeArg, dbName = dbNameArg, dbHost = dbHostArg, dbPort = dbPortArg, algoid = algoidArg, modelset = modelSetArg)
* computation
// convert to (uindex, iindex, rating) format
val predictedRating = predicted.flatMap('predicted -> ('iindex, 'rating)) { data: String => parsePredictedData(data) }
.project('uindex, 'iindex, 'rating)
.joinWithSmaller('iindex -> 'iindexI, itemsIndex)
.joinWithSmaller('uindex -> 'uindexU, usersIndex)
.project('uidU, 'iidI, 'rating, 'itypesI)
.groupBy('uidU) { _.sortBy('rating).reverse.toList[(String, Double, List[String])](('iidI, 'rating, 'itypesI) -> 'iidsList) }
.mapTo(('uidU, 'iidsList) -> ('uidU, 'iidsList)) {
fields: (String, List[(String, Double, List[String])]) =>
val (uidU, iidsList) = fields
(uidU, iidsList.take(numRecommendationsArg))
.then(itemRecScoresSink.writeData('uidU, 'iidsList, algoidArg, modelSetArg) _)
Mahout ItemRec output format
[24:3.2] => (24, 3.2)
[8:2.5,0:2.5] => (8, 2.5), (0, 2.5)
def parsePredictedData(data: String): List[(String, Double)] = {
val dataLen = data.length
data.take(dataLen - 1).tail.split(",") { ratingData =>
val ratingDataArray = ratingData.split(":")
val item = ratingDataArray(0)
val rating: Double = try {
} catch {
case e: Exception =>
assert(false, s"Cannot convert rating value of item ${item} to double: " + ratingDataArray + ". Exception: " + e)
(item, rating)