By default the recommendation template reads the rate and buy user events and the user itself. You can modify the default DataSource to read your custom item with specified list of properties.
First off all you have to import your events to the pio event server.
You can use ImportDataScript.scala to import users, movies and rate events from movielenses database. Make sure that data files are in UTF-8
encoding.
This command line tool accepts 2 args:
http://localhost:7070
For example in the sbt console: > runMain org.template.recommendation.ImportDataScript.scala <access_key>
This example is based on v0.1.1 of scala parallel recommendation template
In this example we modify DataSource to read custom event with one property.
case class Query(user: String, num: Int, creationYear: Option[Int] = None)
case class Item(creationYear: Option[Int])
object ItemMarshaller { def unmarshall(properties: DataMap): Option[Item] = Some(Item(properties.getOpt[Int]("creationYear"))) }
class TrainingData(val ratings: RDD[Rating], val items: RDD[(String, Item)])
val itemsRDD = eventsDb.aggregateProperties( appId = dsp.appId, entityType = "item" )(sc).flatMap { case (entityId, properties) ⇒ ItemMarshaller.unmarshall(properties).map(entityId → _) }
class ALSModel( val productFeatures: RDD[(Int, Array[Double])], val itemStringIntMap: BiMap[String, Int], val items: Map[Int, Item])
val items: Map[Int, Item] = data.items.map { case (id, item) ⇒ (itemStringIntMap(id), item) }.collectAsMap.toMap
private def filterItems(selectedScores: Array[(Int, Double)], items: Map[Int, Item], query: Query) = selectedScores.view.filter { case (iId, _) ⇒ items(iId).creationYear.map(icr ⇒ query.creationYear.forall(icr >= _)) .getOrElse(true) }
val filteredScores = filterItems(indexScores, model.items, query)
class Serving extends LServxing[Query, PredictedResult] { override def serve(query: Query, predictedResults: Seq[PredictedResult]): PredictedResult = predictedResults.headOption.map { result ⇒ val preparedItems = result.itemScores .sortBy { case ItemScore(item, score, year) ⇒ year }( Ordering.Option[Int].reverse) new PredictedResult(preparedItems) }.getOrElse(new PredictedResult(Array.empty[ItemScore])) }
> curl -H 'Content-Type: application/json' '127.0.0.1:8000/queries.json' -d '{"user":100, "num":5, "creationYear":1990}' | python -m json.tool
Where result of curl is piped to python json.tool lib just for convenience to pretty print the response from engine:
"itemScores": [ { "creationYear": 1996, "item": "831", "score": 518.9319563470217 }, { "creationYear": 1996, "item": "1619", "score": 15.321792791296401 }, { "creationYear": 1994, "item": "1554", "score": 628.1994336041231 }, { "creationYear": 1993, "item": "736", "score": 419.3508956666954 }, { "creationYear": 1991, "item": "627", "score": 498.28818189885175 } ] }
That's it! Now your recommendation engine is using filtering on custom item field on predicted result set.