This example engine is based on Similar Product Tempplate version v0.1.2 and is modified to add Explicit Rate Event to training data.
For example, An User would rate an item with a score or rating.The rating is used to train the model.
Please refer to http://docs.prediction.io/templates/similarproduct/quickstart/
case class Rating( user: String, item: String, rating: Double, t:Long )
val rateEventsRDD: RDD[RateEvent] = eventsDb.find( appId = dsp.appId, entityType = Some("user"), eventNames = Some(List("rate")... val rateEvent = try { event.event match { case "rate" => RateEvent( user = event.entityId, item = event.targetEntityId.get, rating = event.properties.get[Double]("rating")...
class PreparedData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val rateEvents: RDD[RateEvent]
new PreparedData( users = trainingData.users, items = trainingData.items, rateEvents = trainingData.rateEvents)
def train(sc:SparkContext ,data: PreparedData): ALSModel = { require(!data.rateEvents.take(1).isEmpty, s"rateEvents in PreparedData cannot be empty." +
val mllibRatings = data.rateEvents
val m = ALS.train( ratings = mllibRatings, rank = ap.rank, iterations = ap.numIterations, lambda = ap.lambda, blocks = -1, seed = seed)
val rateEventsRDD: RDD[RateEvent] = eventsDb.find(...)
.reduceByKey { case (v1, v2) => // MODIFIED // if a user may rate same item with different value at different times, // use the latest value for this case. // Can remove this reduceByKey() if no need to support this case. val (rating1, t1) = v1 val (rating2, t2) = v2 // keep the latest value if (t1 > t2) v1 else v2 }
.map { case ((u, i), (rating, t)) => // MODIFIED // MLlibRating requires integer index for user and item MLlibRating(u, i, rating) // MODIFIED }.cache()
class TrainingData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val rateEvents: RDD[RateEvent] )
new TrainingData( users = usersRDD, items = itemsRDD, rateEvents = rateEventsRDD)