blob: 86bbaee8a3a82c4204051477cf2da923917abe0f [file] [log] [blame]
package io.prediction.algorithms.scalding.mahout.itemsim
import com.twitter.scalding._
import io.prediction.commons.filepath.{DataFile, AlgoFile}
import io.prediction.commons.scalding.modeldata.ItemSimScores
/**
* Source:
*
* Sink:
*
* Description:
*
* Required args:
* --dbType: <string> modeldata DB type (eg. mongodb) (see --dbHost, --dbPort)
* --dbName: <string> (eg. predictionio_modeldata)
*
* --hdfsRoot: <string>. Root directory of the HDFS
*
* --appid: <int>
* --engineid: <int>
* --algoid: <int>
* --modelSet: <boolean> (true/false). flag to indicate which set
*
* --numSimilarItems: <int>. number of similar items to be generated
*
* Optionsl args:
* --dbHost: <string> (eg. "127.0.0.1")
* --dbPort: <int> (eg. 27017)
*
* --evalid: <int>. Offline Evaluation if evalid is specified
* --debug: <String>. "test" - for testing purpose
*
* Example:
*
*/
class ModelConstructor(args: Args) extends Job(args) {
/**
* parse args
*/
val dbTypeArg = args("dbType")
val dbNameArg = args("dbName")
val dbHostArg = args.optional("dbHost")
val dbPortArg = args.optional("dbPort") map (x => x.toInt)
val hdfsRootArg = args("hdfsRoot")
val appidArg = args("appid").toInt
val engineidArg = args("engineid").toInt
val algoidArg = args("algoid").toInt
val evalidArg = args.optional("evalid") map (x => x.toInt)
val OFFLINE_EVAL = (evalidArg != None) // offline eval mode
val debugArg = args.list("debug")
val DEBUG_TEST = debugArg.contains("test") // test mode
val modelSetArg = args("modelSet").toBoolean
val numSimilarItems = args("numSimilarItems").toInt
/**
* source
*/
val similarities = Tsv(AlgoFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "similarities.tsv"), ('iindex, 'simiindex, 'score)).read
val itemsIndex = Tsv(DataFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "itemsIndex.tsv")).read
.mapTo((0, 1, 2) -> ('iindexI, 'iidI, 'itypesI)) { fields: (String, String, String) =>
val (iindex, iid, itypes) = fields // itypes are comma-separated String
(iindex, iid, itypes.split(",").toList)
}
/**
* sink
*/
val ItemSimScoresSink = ItemSimScores(dbType=dbTypeArg, dbName=dbNameArg, dbHost=dbHostArg, dbPort=dbPortArg, algoid=algoidArg, modelset=modelSetArg)
/**
* computation
*/
val sim = similarities.joinWithSmaller('iindex -> 'iindexI, itemsIndex)
.discard('iindex, 'iindexI)
.rename(('iidI, 'itypesI) -> ('iid, 'itypes))
.joinWithSmaller('simiindex -> 'iindexI, itemsIndex)
val sim1 = sim.project('iid, 'iidI, 'itypesI, 'score)
val sim2 = sim.mapTo(('iidI, 'iid, 'itypes, 'score) -> ('iid, 'iidI, 'itypesI, 'score)) { fields: (String, String, List[String], String) => fields }
val combinedSimilarities = sim1 ++ sim2
combinedSimilarities
.groupBy('iid) { _.sortBy('score).reverse.toList[(String, Double, List[String])](('iidI, 'score, 'itypesI) -> 'simiidsList) }
.then ( ItemSimScoresSink.writeData('iid, 'simiidsList, algoidArg, modelSetArg) _ )
}