blob: cf549c0ec8a068a7e1490f3b1759083ff2361aff [file] [log] [blame]
package io.prediction.algorithms.scalding.itemsim.itemsimcf
import com.twitter.scalding._
import cascading.pipe.Pipe
import io.prediction.commons.filepath.{ DataFile, AlgoFile }
/**
* Source: ratings.tsv
* Sink: itemSimScores.tsv
* Descripton:
* Compute item similarity score.
*
* Required args:
* --hdfsRoot: <string>. Root directory of the HDFS
*
* --appid: <int>
* --engineid: <int>
* --algoid: <int>
*
* --measureParam: <string>. distance measurement function. select one of "correl", "cosine", "jaccard"
* --priorCountParam: <int>. for regularization. number of virtual pairs
* --priorCorrelParam: <double>. for regularization. correlation of these virtual pairs
* --minNumRatersParam: <int>. min number of raters of the item
* --maxNumRatersParam: <int> max number of raters of the item
* --minIntersectionParam: <int>. min number of co-rater users between 2 simliar items
* --numSimilarItems: <int>. number of similar items to be generated
*
* Optional args:
* --evalid: <int>. Offline Evaluation if evalid is specified
*
* Example:
* scald.rb --hdfs-local io.prediction.algorithms.scalding.itemsim.itemsimcf.ItemSimilarity --hdfsRoot hdfs/predictionio/ --appid 34 --engineid 2 --algoid 8 --measureParam correl --priorCountParam 20 --priorCorrelParam 0.05
*/
class ItemSimilarity(args: Args) extends VectorSimilarities(args) {
// args
val hdfsRootArg = args("hdfsRoot")
val appidArg = args("appid").toInt
val engineidArg = args("engineid").toInt
val algoidArg = args("algoid").toInt
val evalidArg = args.optional("evalid") map (x => x.toInt)
val measureParamArg = args("measureParam")
val priorCountParamArg = args("priorCountParam").toInt
val priorCorrelParamArg = args("priorCorrelParam").toDouble
val minNumRatersParamArg = args("minNumRatersParam").toInt
val maxNumRatersParamArg = args("maxNumRatersParam").toInt
val minIntersectionParamArg = args("minIntersectionParam").toInt
val numSimilarItemsArg = args("numSimilarItems").toInt
// override VectorSimilarities param
override val MEASURE: String = measureParamArg
override val PRIOR_COUNT: Int = priorCountParamArg
override val PRIOR_CORRELATION: Double = priorCorrelParamArg
override val MIN_NUM_RATERS: Int = minNumRatersParamArg
override val MAX_NUM_RATERS: Int = maxNumRatersParamArg
override val MIN_INTERSECTION: Int = minIntersectionParamArg
override def input(userField: Symbol, itemField: Symbol, ratingField: Symbol): Pipe = {
Tsv(DataFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "ratings.tsv")).read
.mapTo((0, 1, 2) -> (userField, itemField, ratingField)) { fields: (String, String, Double) => fields }
}
// start computation
vectorSimilaritiesAlgo('iid, 'simiid, 'score)
.groupBy('iid) { _.sortBy('score).reverse.take(numSimilarItemsArg) }
.write(Tsv(AlgoFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "itemSimScores.tsv")))
}